Skip to content

Commit 4def7a5

Browse files
authored
feat: parallelize download (#169)
* feat: starting parallel job manager * fix: get parallel process working for download * feat: update testcases * chore: update version * chore: update readme * feat: add configurable parallel count
1 parent 46dbb10 commit 4def7a5

File tree

15 files changed

+510
-36
lines changed

15 files changed

+510
-36
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [5.2.0] - 2023-07-05
9+
### Added
10+
- [#169](https://github.com/unity-sds/unity-data-services/pull/169) feat: parallelize download
11+
812
## [5.1.0] - 2023-06-08
913
### Added
1014
- [#156](https://github.com/unity-sds/unity-data-services/pull/156) feat: added filter keyword in granules endpoint + repeatedly checking with time boundary for cataloging result

cumulus_lambda_functions/lib/processing_jobs/__init__.py

Whitespace-only changes.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from abc import ABCMeta, abstractmethod
2+
3+
4+
class JobExecutorAbstract(metaclass=ABCMeta):
5+
@abstractmethod
6+
def validate_job(self, job_obj):
7+
return
8+
9+
@abstractmethod
10+
def execute_job(self, job_obj, lock) -> bool:
11+
return
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
from abc import ABCMeta, abstractmethod
2+
3+
from cumulus_lambda_functions.lib.utils.fake_lock import FakeLock
4+
5+
6+
class JobManagerProps:
7+
def __init__(self):
8+
self.__job_bucket = None
9+
self.__job_path = None
10+
self.__job_file_postfix = ''
11+
self.__processing_job_path = None
12+
self.__lock = FakeLock()
13+
self.__memory_job_dict = {}
14+
15+
@property
16+
def memory_job_dict(self):
17+
return self.__memory_job_dict
18+
19+
@memory_job_dict.setter
20+
def memory_job_dict(self, val):
21+
"""
22+
:param val:
23+
:return: None
24+
"""
25+
self.__memory_job_dict = val
26+
return
27+
28+
def load_from_json(self, input_json: dict):
29+
if 'memory_job_dict' in input_json:
30+
self.memory_job_dict = input_json['memory_job_dict']
31+
if 'job_bucket' in input_json:
32+
self.job_bucket = input_json['job_bucket']
33+
if 'job_path' in input_json:
34+
self.job_path = input_json['job_path']
35+
if 'processing_job_path' in input_json:
36+
self.processing_job_path = input_json['processing_job_path']
37+
if 'job_file_postfix' in input_json:
38+
self.job_file_postfix = input_json['job_file_postfix']
39+
return self
40+
41+
@property
42+
def job_file_postfix(self):
43+
return self.__job_file_postfix
44+
45+
@job_file_postfix.setter
46+
def job_file_postfix(self, val):
47+
"""
48+
:param val:
49+
:return: None
50+
"""
51+
self.__job_file_postfix = val
52+
return
53+
54+
@property
55+
def lock(self):
56+
return self.__lock
57+
58+
@lock.setter
59+
def lock(self, val):
60+
"""
61+
:param val:
62+
:return: None
63+
"""
64+
self.__lock = val
65+
return
66+
67+
@property
68+
def job_bucket(self):
69+
return self.__job_bucket
70+
71+
@job_bucket.setter
72+
def job_bucket(self, val):
73+
"""
74+
:param val:
75+
:return: None
76+
"""
77+
self.__job_bucket = val
78+
return
79+
80+
@property
81+
def job_path(self):
82+
return self.__job_path
83+
84+
@job_path.setter
85+
def job_path(self, val):
86+
"""
87+
:param val:
88+
:return: None
89+
"""
90+
self.__job_path = val
91+
return
92+
93+
@property
94+
def processing_job_path(self):
95+
return self.__processing_job_path
96+
97+
@processing_job_path.setter
98+
def processing_job_path(self, val):
99+
"""
100+
:param val:
101+
:return: None
102+
"""
103+
self.__processing_job_path = val
104+
return
105+
106+
107+
class JobManagerAbstract(metaclass=ABCMeta):
108+
@abstractmethod
109+
def get_all_job_files(self):
110+
return
111+
112+
@abstractmethod
113+
def get_job_file(self, job_path, validate_job_content=lambda x: True):
114+
return
115+
116+
@abstractmethod
117+
def remove_from_processing(self, job_path):
118+
return
119+
120+
@abstractmethod
121+
def put_back_failed_job(self, original_job_path: str):
122+
return
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
2+
from cumulus_lambda_functions.lib.processing_jobs.job_manager_abstract import JobManagerProps
3+
from cumulus_lambda_functions.lib.processing_jobs.job_manager_local_filesystem import JobManagerLocalFileSystem
4+
from cumulus_lambda_functions.lib.processing_jobs.job_manager_memory import JobManagerMemory
5+
from cumulus_lambda_functions.lib.utils.factory_abstract import FactoryAbstract
6+
7+
8+
class JobManagerFactory(FactoryAbstract):
9+
MEMORY = 'MEMORY'
10+
LOCAL = 'LOCAL'
11+
def get_instance(self, class_type, **kwargs):
12+
props = JobManagerProps().load_from_json(kwargs)
13+
fr = class_type.upper()
14+
if fr == self.LOCAL:
15+
return JobManagerLocalFileSystem(props)
16+
if fr == self.MEMORY:
17+
return JobManagerMemory(props)
18+
raise ModuleNotFoundError(f'cannot find JobManagerFactory class for {class_type}')
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import logging
2+
import os
3+
from glob import glob
4+
5+
from cumulus_lambda_functions.lib.processing_jobs.job_manager_abstract import JobManagerAbstract, JobManagerProps
6+
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils
7+
8+
LOGGER = logging.getLogger(__name__)
9+
10+
11+
class JobManagerLocalFileSystem(JobManagerAbstract):
12+
def __init__(self, props=JobManagerProps()) -> None:
13+
super().__init__()
14+
self.__props = props
15+
16+
def get_all_job_files(self):
17+
return [k.replace(self.__props.job_bucket, '')[1:] for k in glob(os.path.join(self.__props.job_bucket, self.__props.job_path, f'*{self.__props.job_file_postfix}'))]
18+
19+
def get_job_file(self, job_path, validate_job_content=lambda x: True):
20+
job_abs_path = os.path.join(self.__props.job_bucket, job_path)
21+
with self.__props.lock:
22+
try:
23+
job_content = FileUtils.read_json(job_abs_path)
24+
except:
25+
LOGGER.exception(f'job in this path is not in JSON: {job_path}')
26+
return None
27+
FileUtils.remove_if_exists(job_abs_path)
28+
if validate_job_content(job_content) is False:
29+
LOGGER.error(f'{job_path} does not have valid json: {job_content}. Putting back')
30+
FileUtils.write_json(job_abs_path, job_content, overwrite=True)
31+
return None
32+
LOGGER.debug(f'writing job to processing path')
33+
FileUtils.mk_dir_p(os.path.join(self.__props.job_bucket, self.__props.processing_job_path))
34+
FileUtils.write_json(os.path.join(self.__props.job_bucket, self.__props.processing_job_path, os.path.basename(job_path)), job_content, overwrite=True)
35+
return job_content
36+
37+
def put_back_failed_job(self, original_job_path: str):
38+
processing_job_path = os.path.join(self.__props.processing_job_path, os.path.basename(original_job_path))
39+
LOGGER.debug(f'reading failed job file: {processing_job_path}')
40+
try:
41+
job_json: dict = FileUtils.read_json(os.path.join(self.__props.job_bucket, processing_job_path))
42+
LOGGER.debug(f'deleting failed job file from processing dir: {processing_job_path}')
43+
FileUtils.remove_if_exists(os.path.join(self.__props.job_bucket, processing_job_path))
44+
LOGGER.debug(f'uploading failed job back to folder')
45+
FileUtils.write_json(os.path.join(self.__props.job_bucket, original_job_path), job_json)
46+
except Exception as e:
47+
LOGGER.exception(f'error while reading failed job file from from processing dir: {processing_job_path}')
48+
return
49+
50+
def remove_from_processing(self, job_path):
51+
FileUtils.remove_if_exists(os.path.join(self.__props.job_bucket, self.__props.processing_job_path, os.path.basename(job_path)))
52+
return True
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from cumulus_lambda_functions.lib.processing_jobs.job_manager_abstract import JobManagerAbstract, JobManagerProps
2+
3+
4+
class JobManagerMemory(JobManagerAbstract):
5+
def __init__(self, props=JobManagerProps()) -> None:
6+
self.__props = props
7+
self.__processing_job_dict = {}
8+
9+
def get_all_job_files(self):
10+
return [k for k in self.__props.memory_job_dict.keys()]
11+
12+
def get_job_file(self, job_path, validate_job_content=lambda x: True):
13+
if job_path not in self.__props.memory_job_dict:
14+
return None
15+
job = self.__props.memory_job_dict.get(job_path)
16+
del self.__props.memory_job_dict[job_path]
17+
if not validate_job_content(job):
18+
return None
19+
self.__processing_job_dict[job_path] = job
20+
return job
21+
22+
def remove_from_processing(self, job_path):
23+
if job_path not in self.__processing_job_dict:
24+
return
25+
del self.__processing_job_dict[job_path]
26+
pass
27+
28+
def put_back_failed_job(self, original_job_path: str):
29+
if original_job_path not in self.__processing_job_dict:
30+
return
31+
self.__props.memory_job_dict[original_job_path] = self.__processing_job_dict[original_job_path]
32+
del self.__processing_job_dict[original_job_path]
33+
return

0 commit comments

Comments
 (0)