@@ -263,50 +263,42 @@ def __init__(self, output_dir, MAX_WORKERS, **kwargs):
263
263
]
264
264
265
265
@retry (tries = 5 , delay = 5 , backoff = 2 , jitter = (2 , 9 ))
266
- def downloadSeries (self , series_data , number = 0 , path = "" ):
266
+ def downloadSeries (self , series_instance_uids , path = "" ):
267
267
base_url = "https://services.cancerimagingarchive.net/nbia-api/services/v1/"
268
268
downloadOptions = "getImage?NewFileNames=Yes&SeriesInstanceUID="
269
269
270
- with Progress () as progress :
271
- task = progress .add_task (
272
- "Downloading series from TCIA" , total = len (series_data )
273
- )
274
- for seriesUID , modality in series_data :
275
- if (self .include and modality not in self .include ) or (
276
- self .exclude and modality in self .exclude
277
- ):
278
- continue # Skip the download if the modality is not in the include list or is in the exclude list
279
-
280
- pathTmp = os .path .join (path , seriesUID )
281
- data_url = base_url + downloadOptions + seriesUID
282
- if not os .path .isdir (pathTmp ):
283
- try :
284
- data = requests .get (data_url )
285
- if data .status_code == 200 :
286
- with zipfile .ZipFile (io .BytesIO (data .content )) as file :
287
- file .extractall (path = pathTmp )
288
- progress .update (task , advance = 1 )
289
- if number > 0 and progress .completed >= number :
290
- break
291
- except Exception as e :
292
- logging .error (f"Failed to download series { seriesUID } : { e } " )
293
-
294
- # Function to recursively search for keys in nested dictionaries and lists
295
- def find_values (self , key , dictionary ):
296
- found_values = []
297
-
298
- if isinstance (dictionary , dict ):
299
- for k , v in dictionary .items ():
300
- if k == key :
301
- found_values .append (v )
302
- elif isinstance (v , (dict , list )):
303
- found_values .extend (self .find_values (key , v ))
304
-
305
- elif isinstance (dictionary , list ):
306
- for item in dictionary :
307
- found_values .extend (self .find_values (key , item ))
308
-
309
- return found_values
270
+ with ThreadPoolExecutor (max_workers = self .MAX_WORKERS ) as executor :
271
+ with Progress () as progress :
272
+ task = progress .add_task (
273
+ "Downloading series from TCIA" , total = len (series_instance_uids )
274
+ )
275
+ futures = []
276
+ for seriesUID , modality in series_instance_uids :
277
+ if self .include and modality not in self .include :
278
+ continue
279
+ if self .exclude and modality in self .exclude :
280
+ continue
281
+ pathTmp = os .path .join (path , seriesUID )
282
+ data_url = base_url + downloadOptions + seriesUID
283
+ futures .append (
284
+ executor .submit (
285
+ self .download_helper , data_url , pathTmp , progress , task
286
+ )
287
+ )
288
+
289
+ for future in as_completed (futures ):
290
+ future .result () # Handling potential exceptions or results from futures
291
+
292
+ def download_helper (self , data_url , path , progress , task ):
293
+ if not os .path .isdir (path ):
294
+ try :
295
+ data = requests .get (data_url )
296
+ if data .status_code == 200 :
297
+ with zipfile .ZipFile (io .BytesIO (data .content )) as file :
298
+ file .extractall (path = path )
299
+ progress .update (task , advance = 1 )
300
+ except Exception as e :
301
+ logging .error (f"Failed to download series from { data_url } : { e } " )
310
302
311
303
def find_and_process_series (self ):
312
304
with open (self .MANIFEST_FILE , "r" ) as file :
@@ -315,56 +307,51 @@ def find_and_process_series(self):
315
307
for entry in manifest :
316
308
patient_id = entry .get ("PatientID" )
317
309
for modality in self .modalities :
318
- if modality in entry :
319
- # Process each series under the modality
320
- if modality not in ["PatientID" , "StudyInstanceUID" ] and (
310
+ if (
311
+ modality in entry
312
+ and modality not in ["PatientID" , "StudyInstanceUID" ]
313
+ and (
321
314
(self .include and modality in self .include )
322
315
or (self .exclude and modality not in self .exclude )
323
- ):
324
- for series in entry [modality ]:
325
- series_instance_uid = series .get ("SeriesInstanceUID" )
326
- if series_instance_uid :
327
- self .move_series_folder (
328
- series_instance_uid , patient_id , modality
329
- )
316
+ )
317
+ ):
318
+ for series in entry [modality ]:
319
+ series_instance_uid = series .get ("SeriesInstanceUID" )
320
+ if series_instance_uid :
321
+ self .move_series_folder (
322
+ series_instance_uid , patient_id , modality
323
+ )
330
324
331
325
def move_series_folder (self , series_instance_uid , patient_id , modality ):
332
326
source_path = os .path .join (self .output_dir , series_instance_uid )
333
327
dest_path = os .path .join (
334
328
self .output_dir , "raw" , patient_id , modality , series_instance_uid
335
329
)
336
330
337
- if not os .path .exists (source_path ):
338
- logging .warning (f"Series not found: { series_instance_uid } " )
339
- return
340
-
341
331
os .makedirs (dest_path , exist_ok = True )
342
- if os .path .exists (os .path .join (dest_path , series_instance_uid )):
343
- logging .warning (f"Overwriting existing series: { series_instance_uid } " )
344
- shutil .rmtree (dest_path )
345
- shutil .move (source_path , dest_path )
346
- else :
332
+ if os .path .exists (source_path ):
333
+ if os .path .exists (os .path .join (dest_path , series_instance_uid )):
334
+ shutil .rmtree (dest_path )
347
335
shutil .move (source_path , dest_path )
348
336
349
337
def process_cases (self ):
350
338
with open (self .MANIFEST_FILE , "r" ) as f :
351
339
manifest = json .load (f )
352
340
353
- series_instance_uids = []
354
- for entry in manifest :
355
- for modality in self .modalities :
356
- if modality in entry :
357
- # Process each series under the modality
358
- if modality not in ["PatientID" , "StudyInstanceUID" ] and (
359
- (self .include and modality in self .include )
360
- or (self .exclude and modality not in self .exclude )
361
- ):
362
- series_instance_uids .extend (
363
- [
364
- (series .get ("SeriesInstanceUID" ), modality )
365
- for series in entry [modality ]
366
- ]
367
- )
341
+ series_instance_uids = [
342
+ (series ["SeriesInstanceUID" ], modality )
343
+ for entry in manifest
344
+ for modality in self .modalities
345
+ if modality in entry
346
+ for series in entry [modality ]
347
+ if (
348
+ "SeriesInstanceUID" in series
349
+ and (
350
+ (self .include and modality in self .include )
351
+ or (self .exclude and modality not in self .exclude )
352
+ )
353
+ )
354
+ ]
368
355
369
356
self .downloadSeries (series_instance_uids , path = self .output_dir )
370
357
self .find_and_process_series ()
0 commit comments