Skip to content

Commit 2c3aa62

Browse files
authored
fix: Cataloging large number asynchronously by batch + download is stuck when there are large number of files (#194)
* fix: retry 5 times + wait 45 sec for DAAC * feat: make re-try configurable * feat: adding fastapi (in-progress) * fix: get collection creation working (in-progress) * feat: default poll checking to 0 times + conflict bug in dapa client * feat: add facade endpoint + real endpoint for cnm * fix: update version * fix: need to call PUT. not POST * fix: update test case * chore: add large data catalog testcase * fix: use Manager().Q instead of normal Q to fix it for download stuck for large granules * chore: adding support files for test * fix: calling catalog api in batch * chore: update changelog * chore: add missing test file * fix: mismatch version
1 parent 05c460c commit 2c3aa62

File tree

19 files changed

+20935
-47
lines changed

19 files changed

+20935
-47
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [5.3.1] - 2023-08-16
9+
### Changed
10+
- [#194](https://github.com/unity-sds/unity-data-services/pull/194) fix: Cataloging large number asynchronously by batch + download is stuck when there are large number of files
11+
12+
## [5.3.0] - 2023-08-07
13+
### Changed
14+
- [#190](https://github.com/unity-sds/unity-data-services/pull/190) feat: Using Fastapi for all API endpoints
15+
816
## [5.2.3] - 2023-08-07
917
### Changed
1018
- [#193](https://github.com/unity-sds/unity-data-services/pull/193) fix: add collection folder in s3 upload

ci.cd/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ export VERSION ?= latest
66
all: build_lambda upload_lambda update_lambda_function build_docker
77
local: build_lambda upload_lambda update_lambda_function_1 update_lambda_function_2 update_lambda_function_3
88
build_docker:
9-
docker build -t "$(IMAGE_PREFIX)/$(NAME):$(VERSION)" -f docker/Dockerfile .
9+
docker build --no-cache -t "$(IMAGE_PREFIX)/$(NAME):$(VERSION)" -f docker/Dockerfile_download_granules.jpl .
1010

1111
zip_docker:
1212
docker save "$(IMAGE_PREFIX)/$(NAME):$(VERSION)" | gzip > "$(NAME)__$(VERSION).tar.gz"

cumulus_lambda_functions/cumulus_dapa_client/dapa_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ def __init__(self):
1616
self.__token_retriever = CognitoTokenRetriever()
1717
self.__token = None
1818
self.__dapa_base_api = None
19-
self.__get_dapa_base_api()
2019
self.__verify_ssl = True
2120
self.__api_base_prefix = WebServiceConstants.API_PREFIX
21+
self.__get_dapa_base_api()
2222

2323
def with_verify_ssl(self, verify_ssl: bool):
2424
self.__verify_ssl = verify_ssl
@@ -126,5 +126,5 @@ def ingest_granules_w_cnm(self, cnm_ingest_body: dict) -> str:
126126
if response.status_code > 400:
127127
raise RuntimeError(
128128
f'querying granules ingestion ends in error. status_code: {response.status_code}. url: {dapa_ingest_cnm_api}. details: {response.text}')
129-
granules_result = response.text
129+
granules_result = json.loads(response.text)
130130
return granules_result

cumulus_lambda_functions/lib/processing_jobs/multithread_processor.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import time
33
import multiprocessing
44
multiprocessing.set_start_method("fork")
5-
from multiprocessing import Process, Queue, Lock, cpu_count
5+
from multiprocessing import Process, Queue, Lock, cpu_count, Manager
66
from random import randint
77

88
from cumulus_lambda_functions.lib.processing_jobs.job_executor_abstract import JobExecutorAbstract
@@ -142,13 +142,14 @@ def __execute_job(self):
142142
else:
143143
LOGGER.debug(f'executed job: `{job}` ends in Error. putting back to jobs dir')
144144
self.__props.job_manager.put_back_failed_job(job_path)
145+
LOGGER.debug(f'quitting __execute_job')
145146
return
146147

147148
def start(self):
148149
if self.__props.job_executor is None or self.__props.job_manager is None:
149150
raise RuntimeError('missing job_executor or job_manager')
150151
LOGGER.info(f'multithread processing starting with process_count: {self.__props.process_count}')
151-
for i in range(cpu_count()):
152+
for i in range(self.__props.process_count):
152153
p = Process(target=self.__execute_job, args=())
153154
p.daemon = True
154155
self.__props.consumers.append(p)
@@ -162,4 +163,5 @@ def start(self):
162163
for c in self.__props.consumers: # to check if all consumers are done processing it
163164
LOGGER.info('joining consumers: {}. exit_code: {}'.format(c.pid, c.exitcode))
164165
c.join()
166+
LOGGER.debug('joined')
165167
return
Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import json
22

3+
from cumulus_lambda_functions.stage_in_out.stage_in_out_utils import StageInOutUtils
4+
35
from cumulus_lambda_functions.cumulus_dapa_client.dapa_client import DapaClient
46
from cumulus_lambda_functions.lib.time_utils import TimeUtils
57
from cumulus_lambda_functions.stage_in_out.catalog_granules_abstract import CatalogGranulesAbstract
@@ -16,48 +18,58 @@ class CatalogGranulesUnity(CatalogGranulesAbstract):
1618
VERIFY_SSL_KEY = 'VERIFY_SSL'
1719
DELAY_SECOND = 'DELAY_SECOND'
1820
REPEAT_TIMES = 'REPEAT_TIMES'
21+
CHUNK_SIZE = 'CHUNK_SIZE'
22+
DEFAULT_CHUNK_SIZE = 5
1923

2024
def __init__(self) -> None:
2125
super().__init__()
2226
self.__provider_id = ''
2327
self.__verify_ssl = True
2428
self.__delaying_second = 30
25-
self.__repeating_times = 5
29+
self.__repeating_times = 0
30+
self.__chunk_size = self.DEFAULT_CHUNK_SIZE
2631

2732
def __set_props_from_env(self):
2833
missing_keys = [k for k in [self.UPLOADED_FILES_JSON, self.PROVIDER_ID_KEY] if k not in os.environ]
2934
if len(missing_keys) > 0:
3035
raise ValueError(f'missing environment keys: {missing_keys}')
3136
self._retrieve_stac_json()
37+
self.__chunk_size = int(os.environ.get(self.CHUNK_SIZE, self.DEFAULT_CHUNK_SIZE))
38+
self.__chunk_size = self.__chunk_size if self.__chunk_size > 0 else self.DEFAULT_CHUNK_SIZE
3239
self.__provider_id = os.environ.get(self.PROVIDER_ID_KEY)
3340
self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
3441
self.__delaying_second = int(os.environ.get(self.DELAY_SECOND, '30'))
35-
self.__repeating_times = int(os.environ.get(self.REPEAT_TIMES, '5'))
42+
self.__repeating_times = int(os.environ.get(self.REPEAT_TIMES, '0'))
3643
self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
3744
return self
3845

3946
def catalog(self, **kwargs):
4047
self.__set_props_from_env()
4148
if isinstance(self._uploaded_files_json, dict) and 'features' in self._uploaded_files_json:
4249
self._uploaded_files_json = self._uploaded_files_json['features']
43-
dapa_body = {
44-
"provider_id": self.__provider_id,
45-
"features": self._uploaded_files_json
46-
}
47-
dapa_client = DapaClient().with_verify_ssl(self.__verify_ssl)
48-
LOGGER.debug(f'dapa_body_granules: {dapa_body}')
49-
dapa_ingest_result = dapa_client.ingest_granules_w_cnm(dapa_body)
50-
extracted_ids = [k['id'] for k in self._uploaded_files_json]
51-
LOGGER.debug(f'checking following IDs: {extracted_ids}')
52-
status_checker = CatalogingGranulesStatusChecker(self._uploaded_files_json[0]['collection'],
53-
extracted_ids,
54-
TimeUtils().get_datetime_obj().timestamp(),
55-
self.__delaying_second,
56-
self.__repeating_times,
57-
self.__verify_ssl)
58-
status_result = status_checker.verify_n_times()
59-
response_json = {
60-
'cataloging_request_status': dapa_ingest_result,
61-
'status_result': status_result
62-
}
63-
return json.dumps(response_json)
50+
51+
response_jsons = []
52+
for i, features_chunk in enumerate(StageInOutUtils.chunk_list(self._uploaded_files_json, self.__chunk_size)):
53+
LOGGER.debug(f'working on chunk_index {i}')
54+
dapa_body = {
55+
"provider_id": self.__provider_id,
56+
"features": features_chunk
57+
}
58+
dapa_client = DapaClient().with_verify_ssl(self.__verify_ssl)
59+
LOGGER.debug(f'dapa_body_granules: {dapa_body}')
60+
dapa_ingest_result = dapa_client.ingest_granules_w_cnm(dapa_body)
61+
extracted_ids = [k['id'] for k in features_chunk]
62+
LOGGER.debug(f'checking following IDs: {extracted_ids}')
63+
status_checker = CatalogingGranulesStatusChecker(features_chunk[0]['collection'],
64+
extracted_ids,
65+
TimeUtils().get_datetime_obj().timestamp(),
66+
self.__delaying_second,
67+
self.__repeating_times,
68+
self.__verify_ssl)
69+
status_result = status_checker.verify_n_times()
70+
LOGGER.debug(f'chunk_index {i} status_result: {status_result}')
71+
response_jsons.append({
72+
'cataloging_request_status': dapa_ingest_result,
73+
'status_result': status_result
74+
})
75+
return json.dumps(response_jsons)

cumulus_lambda_functions/stage_in_out/cataloging_granules_status_checker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ def __init__(self, collection_id: str, granules_ids: list, threshold_datetime: i
1313
self.__granules_ids = granules_ids
1414
self.__threshold_datetime = threshold_datetime
1515
self.__delay = delay
16-
self.__repeating_times = repeating_times if repeating_times > 0 else 1
16+
self.__repeating_times = repeating_times if repeating_times >= 0 else 0
1717
self.__dapa_client = DapaClient().with_verify_ssl(veriffy_ssl)
1818
self.__registered_granules = {}
1919

cumulus_lambda_functions/stage_in_out/download_granules_abstract.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
import os
44
from abc import ABC, abstractmethod
5-
from multiprocessing import Queue
5+
from multiprocessing import Queue, Manager
66

77
from pystac import ItemCollection, Asset, Item
88

@@ -40,12 +40,11 @@ def execute_job(self, granule_item, lock) -> bool:
4040
value_dict.href = os.path.join('.', os.path.basename(downloading_url))
4141
new_asset_dict[name] = value_dict
4242
granule_item.assets = new_asset_dict
43-
with lock:
44-
self.__result_list.put(granule_item)
43+
self.__result_list.put(granule_item)
4544
except Exception as e:
4645
LOGGER.exception(f'error downloading granule: {granule_item.id}')
47-
with lock:
48-
self.__error_list.put({'error': str(e), 'id': granule_item.id, })
46+
self.__error_list.put({'error': str(e), 'id': granule_item.id, })
47+
LOGGER.debug(f'done DownloadItemExecutor#execute_job')
4948
return True # always return true?
5049

5150

@@ -102,8 +101,8 @@ def download(self, **kwargs) -> str:
102101
return json.dumps(granules_json_dict)
103102
# local_items = []
104103
# error_list = []
105-
local_items = Queue()
106-
error_list = Queue()
104+
local_items = Manager().Queue()
105+
error_list = Manager().Queue()
107106
job_manager_props = JobManagerProps()
108107
for each_item in self._granules_json.items:
109108
job_manager_props.memory_job_dict[each_item.id] = each_item

cumulus_lambda_functions/stage_in_out/stage_in_out_utils.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,10 @@ def write_output_to_file(output_json: Union[dict, str, list]):
2222
with open(output_filepath, 'w') as ff:
2323
ff.write(output_str)
2424
return
25+
26+
@staticmethod
27+
def chunk_list(input_list, chunked_size):
28+
"""Yield successive n-sized chunks from l."""
29+
for i in range(0, len(input_list), chunked_size):
30+
yield input_list[i:i + chunked_size]
31+
return

cumulus_lambda_functions/uds_api/collections_api.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,23 @@
1919

2020
@router.put("")
2121
@router.put("/")
22-
async def ingest_cnm_dapa(request: Request, new_cnm_body: CnmRequestBody):
22+
async def ingest_cnm_dapa(request: Request, new_cnm_body: CnmRequestBody, response: Response):
2323
LOGGER.debug(f'starting ingest_cnm_dapa')
24+
try:
25+
cnm_prep_result = CollectionsDapaCnm(new_cnm_body.model_dump()).start_facade(request.url)
26+
except Exception as e:
27+
LOGGER.exception('failed during ingest_cnm_dapa')
28+
raise HTTPException(status_code=500, detail=str(e))
29+
if cnm_prep_result['statusCode'] < 300:
30+
response.status_code = cnm_prep_result['statusCode']
31+
return cnm_prep_result['body']
32+
raise HTTPException(status_code=cnm_prep_result['statusCode'], detail=cnm_prep_result['body'])
33+
34+
35+
@router.put("/actual")
36+
@router.put("/actual/")
37+
async def ingest_cnm_dapa_actual(request: Request, new_cnm_body: CnmRequestBody):
38+
LOGGER.debug(f'starting ingest_cnm_dapa_actual')
2439
try:
2540
collections_dapa_cnm = CollectionsDapaCnm(new_cnm_body.model_dump())
2641
cnm_result = collections_dapa_cnm.start()
@@ -76,4 +91,4 @@ async def query_collections(request: Request, collection_id: Union[str, None] =
7691
raise HTTPException(status_code=500, detail=str(e))
7792
if collections_result['statusCode'] == 200:
7893
return collections_result['body']
79-
raise HTTPException(status_code=collections_result['statusCode'], detail=collections_result['body'])
94+
raise HTTPException(status_code=collections_result['statusCode'], detail=collections_result['body'])

cumulus_lambda_functions/uds_api/dapa/collections_dapa_cnm.py

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
import os
33
from typing import Union
44

5+
from cumulus_lambda_functions.lib.aws.aws_lambda import AwsLambda
6+
from starlette.datastructures import URL
7+
58
from cumulus_lambda_functions.lib.aws.aws_sns import AwsSns
69

710
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator
@@ -24,10 +27,73 @@ class CnmRequestBody(BaseModel):
2427

2528
class CollectionsDapaCnm:
2629
def __init__(self, request_body):
27-
if 'SNS_TOPIC_ARN' not in os.environ:
28-
raise EnvironmentError('missing key: SNS_TOPIC_ARN')
30+
required_env = ['SNS_TOPIC_ARN', 'COLLECTION_CREATION_LAMBDA_NAME']
31+
if not all([k in os.environ for k in required_env]):
32+
raise EnvironmentError(f'one or more missing env: {required_env}')
2933
self.__sns_topic_arn = os.getenv('SNS_TOPIC_ARN')
3034
self.__request_body = request_body
35+
self.__collection_cnm_lambda_name = os.environ.get('COLLECTION_CREATION_LAMBDA_NAME', '').strip()
36+
37+
38+
def start_facade(self, current_url: URL):
39+
LOGGER.debug(f'request body: {self.__request_body}')
40+
41+
actual_path = current_url.path
42+
actual_path = actual_path if actual_path.endswith('/') else f'{actual_path}/'
43+
actual_path = f'{actual_path}actual'
44+
LOGGER.info(f'sanity_check')
45+
46+
actual_event = {
47+
'resource': actual_path,
48+
'path': actual_path,
49+
'httpMethod': 'PUT',
50+
'headers': {
51+
'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Authorization': 'Bearer xxx',
52+
'Host': current_url.hostname, 'User-Agent': 'python-requests/2.28.2',
53+
'X-Amzn-Trace-Id': 'Root=1-64a66e90-6fa8b7a64449014639d4f5b4', 'X-Forwarded-For': '44.236.15.58',
54+
'X-Forwarded-Port': '443', 'X-Forwarded-Proto': 'https'},
55+
'multiValueHeaders': {
56+
'Accept': ['*/*'], 'Accept-Encoding': ['gzip, deflate'], 'Authorization': ['Bearer xxx'],
57+
'Host': [current_url.hostname], 'User-Agent': ['python-requests/2.28.2'],
58+
'X-Amzn-Trace-Id': ['Root=1-64a66e90-6fa8b7a64449014639d4f5b4'],
59+
'X-Forwarded-For': ['127.0.0.1'], 'X-Forwarded-Port': ['443'], 'X-Forwarded-Proto': ['https']
60+
},
61+
'queryStringParameters': {},
62+
'multiValueQueryStringParameters': {},
63+
'pathParameters': {},
64+
'stageVariables': None,
65+
'requestContext': {
66+
'resourceId': '',
67+
'authorizer': {'principalId': '', 'integrationLatency': 0},
68+
'resourcePath': actual_path, 'httpMethod': 'PUT',
69+
'extendedRequestId': '', 'requestTime': '',
70+
'path': actual_path, 'accountId': '',
71+
'protocol': 'HTTP/1.1', 'stage': '', 'domainPrefix': '', 'requestTimeEpoch': 0,
72+
'requestId': '',
73+
'identity': {
74+
'cognitoIdentityPoolId': None, 'accountId': None, 'cognitoIdentityId': None, 'caller': None,
75+
'sourceIp': '127.0.0.1', 'principalOrgId': None, 'accessKey': None,
76+
'cognitoAuthenticationType': None,
77+
'cognitoAuthenticationProvider': None, 'userArn': None, 'userAgent': 'python-requests/2.28.2',
78+
'user': None
79+
},
80+
'domainName': current_url.hostname, 'apiId': ''
81+
},
82+
'body': json.dumps(self.__request_body),
83+
'isBase64Encoded': False
84+
}
85+
LOGGER.info(f'actual_event: {actual_event}')
86+
response = AwsLambda().invoke_function(
87+
function_name=self.__collection_cnm_lambda_name,
88+
payload=actual_event,
89+
)
90+
LOGGER.debug(f'async function started: {response}')
91+
return {
92+
'statusCode': 202,
93+
'body': {
94+
'message': 'processing'
95+
}
96+
}
3197

3298
def start(self):
3399
"""

cumulus_lambda_functions/uds_api/granules_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,4 @@ async def get_granules_dapa(request: Request, collection_id: str, limit: Union[i
2929
raise HTTPException(status_code=500, detail=str(e))
3030
if granules_result['statusCode'] == 200:
3131
return granules_result['body']
32-
raise HTTPException(status_code=granules_result['statusCode'], detail=granules_result['body'])
32+
raise HTTPException(status_code=granules_result['statusCode'], detail=granules_result['body'])

cumulus_lambda_functions/uds_api/routes_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@
1010
main_router = APIRouter(redirect_slashes=False)
1111
# main_router.include_router(setup_es.router)
1212
main_router.include_router(collections_api.router)
13-
main_router.include_router(granules_api.router)
13+
main_router.include_router(granules_api.router)

docker/stage-in-stage-out/dc-004-catalog.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ services:
3030

3131
DELAY_SECOND: '45'
3232
REPEAT_TIMES: '5'
33+
CHUNK_SIZE: '5'
3334

3435
networks:
3536
- internal

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
setup(
2424
name="cumulus_lambda_functions",
25-
version="5.2.3",
25+
version="5.3.1",
2626
packages=find_packages(),
2727
install_requires=install_requires,
2828
tests_require=['mock', 'nose', 'sphinx', 'sphinx_rtd_theme', 'coverage', 'pystac', 'python-dotenv', 'jsonschema'],

tests/cumulus_lambda_functions/uds_api/test_uds_api.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def test_create_new_collection(self):
113113
# TODO check if collection shows up
114114
return
115115

116-
def test_cnm(self):
116+
def test_cnm_facade(self):
117117
os.environ[Constants.USERNAME] = '/unity/uds/user/wphyo/username'
118118
os.environ[Constants.PASSWORD] = '/unity/uds/user/wphyo/dwssap'
119119
os.environ[Constants.PASSWORD_TYPE] = Constants.PARAM_STORE
@@ -151,6 +151,5 @@ def test_cnm(self):
151151
headers=headers,
152152
json=stac_collection,
153153
)
154-
self.assertEqual(query_result.status_code, 200, f'wrong status code. {query_result.text}')
154+
self.assertEqual(query_result.status_code, 202, f'wrong status code. {query_result.text}')
155155
return
156-

0 commit comments

Comments
 (0)