1
1
from __future__ import annotations
2
2
from pathlib import Path , WindowsPath
3
- from typing import Union
3
+ from typing import Union , Generator
4
4
import os
5
5
import sys
6
6
import datetime
7
7
import json
8
8
from copy import deepcopy
9
9
import importlib
10
10
from math import prod
11
+ from collections import namedtuple
11
12
12
13
import numpy as np
13
14
@@ -183,6 +184,75 @@ def is_dict_extractor(d: dict) -> bool:
183
184
return is_extractor
184
185
185
186
187
+ extractor_dict_element = namedtuple (typename = "extractor_dict_element" , field_names = ["value" , "name" , "access_path" ])
188
+
189
+
190
+ def extractor_dict_iterator (extractor_dict : dict ) -> Generator [extractor_dict_element ]:
191
+ """
192
+ Iterator for recursive traversal of a dictionary.
193
+ This function explores the dictionary recursively and yields the path to each value along with the value itself.
194
+
195
+ By path here we mean the keys that lead to the value in the dictionary:
196
+ e.g. for the dictionary {'a': {'b': 1}}, the path to the value 1 is ('a', 'b').
197
+
198
+ See `BaseExtractor.to_dict()` for a description of `extractor_dict` structure.
199
+
200
+ Parameters
201
+ ----------
202
+ extractor_dict : dict
203
+ Input dictionary
204
+
205
+ Yields
206
+ ------
207
+ extractor_dict_element
208
+ Named tuple containing the value, the name, and the access_path to the value in the dictionary.
209
+
210
+ """
211
+
212
+ def _extractor_dict_iterator (dict_list_or_value , access_path = (), name = "" ):
213
+ if isinstance (dict_list_or_value , dict ):
214
+ for k , v in dict_list_or_value .items ():
215
+ yield from _extractor_dict_iterator (v , access_path + (k ,), name = k )
216
+ elif isinstance (dict_list_or_value , list ):
217
+ for i , v in enumerate (dict_list_or_value ):
218
+ yield from _extractor_dict_iterator (
219
+ v , access_path + (i ,), name = name
220
+ ) # Propagate name of list to children
221
+ else :
222
+ yield extractor_dict_element (
223
+ value = dict_list_or_value ,
224
+ name = name ,
225
+ access_path = access_path ,
226
+ )
227
+
228
+ yield from _extractor_dict_iterator (extractor_dict )
229
+
230
+
231
+ def set_value_in_extractor_dict (extractor_dict : dict , access_path : tuple , new_value ):
232
+ """
233
+ In place modification of a value in a nested dictionary given its access path.
234
+
235
+ Parameters
236
+ ----------
237
+ extractor_dict : dict
238
+ The dictionary to modify
239
+ access_path : tuple
240
+ The path to the value in the dictionary
241
+ new_value : object
242
+ The new value to set
243
+
244
+ Returns
245
+ -------
246
+ dict
247
+ The modified dictionary
248
+ """
249
+
250
+ current = extractor_dict
251
+ for key in access_path [:- 1 ]:
252
+ current = current [key ]
253
+ current [access_path [- 1 ]] = new_value
254
+
255
+
186
256
def recursive_path_modifier (d , func , target = "path" , copy = True ) -> dict :
187
257
"""
188
258
Generic function for recursive modification of paths in an extractor dict.
@@ -250,15 +320,17 @@ def recursive_path_modifier(d, func, target="path", copy=True) -> dict:
250
320
raise ValueError (f"{ k } key for path must be str or list[str]" )
251
321
252
322
253
- def _get_paths_list ( d ):
254
- # this explore a dict and get all paths flatten in a list
255
- # the trick is to use a closure func called by recursive_path_modifier( )
256
- path_list = []
323
+ # This is the current definition that an element in a extractor_dict is a path
324
+ # This is shared across a couple of definition so it is here for DNRY
325
+ element_is_path = lambda element : "path" in element . name and isinstance ( element . value , ( str , Path ) )
326
+
257
327
258
- def append_to_path (p ):
259
- path_list .append (p )
328
+ def _get_paths_list (d : dict ) -> list [str | Path ]:
329
+ path_list = [e .value for e in extractor_dict_iterator (d ) if element_is_path (e )]
330
+
331
+ # if check_if_exists: TODO: Enable this once container_tools test uses proper mocks
332
+ # path_list = [p for p in path_list if Path(p).exists()]
260
333
261
- recursive_path_modifier (d , append_to_path , target = "path" , copy = True )
262
334
return path_list
263
335
264
336
@@ -318,7 +390,7 @@ def check_paths_relative(input_dict, relative_folder) -> bool:
318
390
return len (not_possible ) == 0
319
391
320
392
321
- def make_paths_relative (input_dict , relative_folder ) -> dict :
393
+ def make_paths_relative (input_dict : dict , relative_folder : str | Path ) -> dict :
322
394
"""
323
395
Recursively transform a dict describing an BaseExtractor to make every path relative to a folder.
324
396
@@ -334,9 +406,22 @@ def make_paths_relative(input_dict, relative_folder) -> dict:
334
406
output_dict: dict
335
407
A copy of the input dict with modified paths.
336
408
"""
409
+
337
410
relative_folder = Path (relative_folder ).resolve ().absolute ()
338
- func = lambda p : _relative_to (p , relative_folder )
339
- output_dict = recursive_path_modifier (input_dict , func , target = "path" , copy = True )
411
+
412
+ path_elements_in_dict = [e for e in extractor_dict_iterator (input_dict ) if element_is_path (e )]
413
+ # Only paths that exist are made relative
414
+ path_elements_in_dict = [e for e in path_elements_in_dict if Path (e .value ).exists ()]
415
+
416
+ output_dict = deepcopy (input_dict )
417
+ for element in path_elements_in_dict :
418
+ new_value = _relative_to (element .value , relative_folder )
419
+ set_value_in_extractor_dict (
420
+ extractor_dict = output_dict ,
421
+ access_path = element .access_path ,
422
+ new_value = new_value ,
423
+ )
424
+
340
425
return output_dict
341
426
342
427
@@ -359,12 +444,28 @@ def make_paths_absolute(input_dict, base_folder):
359
444
base_folder = Path (base_folder )
360
445
# use as_posix instead of str to make the path unix like even on window
361
446
func = lambda p : (base_folder / p ).resolve ().absolute ().as_posix ()
362
- output_dict = recursive_path_modifier (input_dict , func , target = "path" , copy = True )
447
+
448
+ path_elements_in_dict = [e for e in extractor_dict_iterator (input_dict ) if element_is_path (e )]
449
+ output_dict = deepcopy (input_dict )
450
+
451
+ output_dict = deepcopy (input_dict )
452
+ for element in path_elements_in_dict :
453
+ absolute_path = (base_folder / element .value ).resolve ()
454
+ if Path (absolute_path ).exists ():
455
+ new_value = absolute_path .as_posix () # Not so sure about this, Sam
456
+ set_value_in_extractor_dict (
457
+ extractor_dict = output_dict ,
458
+ access_path = element .access_path ,
459
+ new_value = new_value ,
460
+ )
461
+
363
462
return output_dict
364
463
365
464
366
465
def recursive_key_finder (d , key ):
367
466
# Find all values for a key on a dictionary, even if nested
467
+ # TODO refactor to use extractor_dict_iterator
468
+
368
469
for k , v in d .items ():
369
470
if isinstance (v , dict ):
370
471
yield from recursive_key_finder (v , key )
0 commit comments