Skip to content

fix: Cataloging large number asynchronously by batch + download is stuck when there are large number of files #194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [5.3.1] - 2023-08-16
### Changed
- [#194](https://github.com/unity-sds/unity-data-services/pull/194) fix: Cataloging large number asynchronously by batch + download is stuck when there are large number of files

## [5.3.0] - 2023-08-07
### Changed
- [#190](https://github.com/unity-sds/unity-data-services/pull/190) feat: Using Fastapi for all API endpoints

## [5.2.3] - 2023-08-07
### Changed
- [#193](https://github.com/unity-sds/unity-data-services/pull/193) fix: add collection folder in s3 upload
Expand Down
2 changes: 1 addition & 1 deletion ci.cd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export VERSION ?= latest
all: build_lambda upload_lambda update_lambda_function build_docker
local: build_lambda upload_lambda update_lambda_function_1 update_lambda_function_2 update_lambda_function_3
build_docker:
docker build -t "$(IMAGE_PREFIX)/$(NAME):$(VERSION)" -f docker/Dockerfile .
docker build --no-cache -t "$(IMAGE_PREFIX)/$(NAME):$(VERSION)" -f docker/Dockerfile_download_granules.jpl .

zip_docker:
docker save "$(IMAGE_PREFIX)/$(NAME):$(VERSION)" | gzip > "$(NAME)__$(VERSION).tar.gz"
Expand Down
4 changes: 2 additions & 2 deletions cumulus_lambda_functions/cumulus_dapa_client/dapa_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ def __init__(self):
self.__token_retriever = CognitoTokenRetriever()
self.__token = None
self.__dapa_base_api = None
self.__get_dapa_base_api()
self.__verify_ssl = True
self.__api_base_prefix = WebServiceConstants.API_PREFIX
self.__get_dapa_base_api()

def with_verify_ssl(self, verify_ssl: bool):
self.__verify_ssl = verify_ssl
Expand Down Expand Up @@ -126,5 +126,5 @@ def ingest_granules_w_cnm(self, cnm_ingest_body: dict) -> str:
if response.status_code > 400:
raise RuntimeError(
f'querying granules ingestion ends in error. status_code: {response.status_code}. url: {dapa_ingest_cnm_api}. details: {response.text}')
granules_result = response.text
granules_result = json.loads(response.text)
return granules_result
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
import multiprocessing
multiprocessing.set_start_method("fork")
from multiprocessing import Process, Queue, Lock, cpu_count
from multiprocessing import Process, Queue, Lock, cpu_count, Manager
from random import randint

from cumulus_lambda_functions.lib.processing_jobs.job_executor_abstract import JobExecutorAbstract
Expand Down Expand Up @@ -142,13 +142,14 @@ def __execute_job(self):
else:
LOGGER.debug(f'executed job: `{job}` ends in Error. putting back to jobs dir')
self.__props.job_manager.put_back_failed_job(job_path)
LOGGER.debug(f'quitting __execute_job')
return

def start(self):
if self.__props.job_executor is None or self.__props.job_manager is None:
raise RuntimeError('missing job_executor or job_manager')
LOGGER.info(f'multithread processing starting with process_count: {self.__props.process_count}')
for i in range(cpu_count()):
for i in range(self.__props.process_count):
p = Process(target=self.__execute_job, args=())
p.daemon = True
self.__props.consumers.append(p)
Expand All @@ -162,4 +163,5 @@ def start(self):
for c in self.__props.consumers: # to check if all consumers are done processing it
LOGGER.info('joining consumers: {}. exit_code: {}'.format(c.pid, c.exitcode))
c.join()
LOGGER.debug('joined')
return
58 changes: 35 additions & 23 deletions cumulus_lambda_functions/stage_in_out/catalog_granules_unity.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json

from cumulus_lambda_functions.stage_in_out.stage_in_out_utils import StageInOutUtils

from cumulus_lambda_functions.cumulus_dapa_client.dapa_client import DapaClient
from cumulus_lambda_functions.lib.time_utils import TimeUtils
from cumulus_lambda_functions.stage_in_out.catalog_granules_abstract import CatalogGranulesAbstract
Expand All @@ -16,48 +18,58 @@ class CatalogGranulesUnity(CatalogGranulesAbstract):
VERIFY_SSL_KEY = 'VERIFY_SSL'
DELAY_SECOND = 'DELAY_SECOND'
REPEAT_TIMES = 'REPEAT_TIMES'
CHUNK_SIZE = 'CHUNK_SIZE'
DEFAULT_CHUNK_SIZE = 5

def __init__(self) -> None:
super().__init__()
self.__provider_id = ''
self.__verify_ssl = True
self.__delaying_second = 30
self.__repeating_times = 5
self.__repeating_times = 0
self.__chunk_size = self.DEFAULT_CHUNK_SIZE

def __set_props_from_env(self):
missing_keys = [k for k in [self.UPLOADED_FILES_JSON, self.PROVIDER_ID_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')
self._retrieve_stac_json()
self.__chunk_size = int(os.environ.get(self.CHUNK_SIZE, self.DEFAULT_CHUNK_SIZE))
self.__chunk_size = self.__chunk_size if self.__chunk_size > 0 else self.DEFAULT_CHUNK_SIZE
self.__provider_id = os.environ.get(self.PROVIDER_ID_KEY)
self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
self.__delaying_second = int(os.environ.get(self.DELAY_SECOND, '30'))
self.__repeating_times = int(os.environ.get(self.REPEAT_TIMES, '5'))
self.__repeating_times = int(os.environ.get(self.REPEAT_TIMES, '0'))
self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
return self

def catalog(self, **kwargs):
self.__set_props_from_env()
if isinstance(self._uploaded_files_json, dict) and 'features' in self._uploaded_files_json:
self._uploaded_files_json = self._uploaded_files_json['features']
dapa_body = {
"provider_id": self.__provider_id,
"features": self._uploaded_files_json
}
dapa_client = DapaClient().with_verify_ssl(self.__verify_ssl)
LOGGER.debug(f'dapa_body_granules: {dapa_body}')
dapa_ingest_result = dapa_client.ingest_granules_w_cnm(dapa_body)
extracted_ids = [k['id'] for k in self._uploaded_files_json]
LOGGER.debug(f'checking following IDs: {extracted_ids}')
status_checker = CatalogingGranulesStatusChecker(self._uploaded_files_json[0]['collection'],
extracted_ids,
TimeUtils().get_datetime_obj().timestamp(),
self.__delaying_second,
self.__repeating_times,
self.__verify_ssl)
status_result = status_checker.verify_n_times()
response_json = {
'cataloging_request_status': dapa_ingest_result,
'status_result': status_result
}
return json.dumps(response_json)

response_jsons = []
for i, features_chunk in enumerate(StageInOutUtils.chunk_list(self._uploaded_files_json, self.__chunk_size)):
LOGGER.debug(f'working on chunk_index {i}')
dapa_body = {
"provider_id": self.__provider_id,
"features": features_chunk
}
dapa_client = DapaClient().with_verify_ssl(self.__verify_ssl)
LOGGER.debug(f'dapa_body_granules: {dapa_body}')
dapa_ingest_result = dapa_client.ingest_granules_w_cnm(dapa_body)
extracted_ids = [k['id'] for k in features_chunk]
LOGGER.debug(f'checking following IDs: {extracted_ids}')
status_checker = CatalogingGranulesStatusChecker(features_chunk[0]['collection'],
extracted_ids,
TimeUtils().get_datetime_obj().timestamp(),
self.__delaying_second,
self.__repeating_times,
self.__verify_ssl)
status_result = status_checker.verify_n_times()
LOGGER.debug(f'chunk_index {i} status_result: {status_result}')
response_jsons.append({
'cataloging_request_status': dapa_ingest_result,
'status_result': status_result
})
return json.dumps(response_jsons)
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(self, collection_id: str, granules_ids: list, threshold_datetime: i
self.__granules_ids = granules_ids
self.__threshold_datetime = threshold_datetime
self.__delay = delay
self.__repeating_times = repeating_times if repeating_times > 0 else 1
self.__repeating_times = repeating_times if repeating_times >= 0 else 0
self.__dapa_client = DapaClient().with_verify_ssl(veriffy_ssl)
self.__registered_granules = {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import os
from abc import ABC, abstractmethod
from multiprocessing import Queue
from multiprocessing import Queue, Manager

from pystac import ItemCollection, Asset, Item

Expand Down Expand Up @@ -40,12 +40,11 @@ def execute_job(self, granule_item, lock) -> bool:
value_dict.href = os.path.join('.', os.path.basename(downloading_url))
new_asset_dict[name] = value_dict
granule_item.assets = new_asset_dict
with lock:
self.__result_list.put(granule_item)
self.__result_list.put(granule_item)
except Exception as e:
LOGGER.exception(f'error downloading granule: {granule_item.id}')
with lock:
self.__error_list.put({'error': str(e), 'id': granule_item.id, })
self.__error_list.put({'error': str(e), 'id': granule_item.id, })
LOGGER.debug(f'done DownloadItemExecutor#execute_job')
return True # always return true?


Expand Down Expand Up @@ -102,8 +101,8 @@ def download(self, **kwargs) -> str:
return json.dumps(granules_json_dict)
# local_items = []
# error_list = []
local_items = Queue()
error_list = Queue()
local_items = Manager().Queue()
error_list = Manager().Queue()
job_manager_props = JobManagerProps()
for each_item in self._granules_json.items:
job_manager_props.memory_job_dict[each_item.id] = each_item
Expand Down
7 changes: 7 additions & 0 deletions cumulus_lambda_functions/stage_in_out/stage_in_out_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,10 @@ def write_output_to_file(output_json: Union[dict, str, list]):
with open(output_filepath, 'w') as ff:
ff.write(output_str)
return

@staticmethod
def chunk_list(input_list, chunked_size):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(input_list), chunked_size):
yield input_list[i:i + chunked_size]
return
19 changes: 17 additions & 2 deletions cumulus_lambda_functions/uds_api/collections_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,23 @@

@router.put("")
@router.put("/")
async def ingest_cnm_dapa(request: Request, new_cnm_body: CnmRequestBody):
async def ingest_cnm_dapa(request: Request, new_cnm_body: CnmRequestBody, response: Response):
LOGGER.debug(f'starting ingest_cnm_dapa')
try:
cnm_prep_result = CollectionsDapaCnm(new_cnm_body.model_dump()).start_facade(request.url)
except Exception as e:
LOGGER.exception('failed during ingest_cnm_dapa')
raise HTTPException(status_code=500, detail=str(e))
if cnm_prep_result['statusCode'] < 300:
response.status_code = cnm_prep_result['statusCode']
return cnm_prep_result['body']
raise HTTPException(status_code=cnm_prep_result['statusCode'], detail=cnm_prep_result['body'])


@router.put("/actual")
@router.put("/actual/")
async def ingest_cnm_dapa_actual(request: Request, new_cnm_body: CnmRequestBody):
LOGGER.debug(f'starting ingest_cnm_dapa_actual')
try:
collections_dapa_cnm = CollectionsDapaCnm(new_cnm_body.model_dump())
cnm_result = collections_dapa_cnm.start()
Expand Down Expand Up @@ -76,4 +91,4 @@ async def query_collections(request: Request, collection_id: Union[str, None] =
raise HTTPException(status_code=500, detail=str(e))
if collections_result['statusCode'] == 200:
return collections_result['body']
raise HTTPException(status_code=collections_result['statusCode'], detail=collections_result['body'])
raise HTTPException(status_code=collections_result['statusCode'], detail=collections_result['body'])
70 changes: 68 additions & 2 deletions cumulus_lambda_functions/uds_api/dapa/collections_dapa_cnm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import os
from typing import Union

from cumulus_lambda_functions.lib.aws.aws_lambda import AwsLambda
from starlette.datastructures import URL

from cumulus_lambda_functions.lib.aws.aws_sns import AwsSns

from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator
Expand All @@ -24,10 +27,73 @@ class CnmRequestBody(BaseModel):

class CollectionsDapaCnm:
def __init__(self, request_body):
if 'SNS_TOPIC_ARN' not in os.environ:
raise EnvironmentError('missing key: SNS_TOPIC_ARN')
required_env = ['SNS_TOPIC_ARN', 'COLLECTION_CREATION_LAMBDA_NAME']
if not all([k in os.environ for k in required_env]):
raise EnvironmentError(f'one or more missing env: {required_env}')
self.__sns_topic_arn = os.getenv('SNS_TOPIC_ARN')
self.__request_body = request_body
self.__collection_cnm_lambda_name = os.environ.get('COLLECTION_CREATION_LAMBDA_NAME', '').strip()


def start_facade(self, current_url: URL):
LOGGER.debug(f'request body: {self.__request_body}')

actual_path = current_url.path
actual_path = actual_path if actual_path.endswith('/') else f'{actual_path}/'
actual_path = f'{actual_path}actual'
LOGGER.info(f'sanity_check')

actual_event = {
'resource': actual_path,
'path': actual_path,
'httpMethod': 'PUT',
'headers': {
'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Authorization': 'Bearer xxx',
'Host': current_url.hostname, 'User-Agent': 'python-requests/2.28.2',
'X-Amzn-Trace-Id': 'Root=1-64a66e90-6fa8b7a64449014639d4f5b4', 'X-Forwarded-For': '44.236.15.58',
'X-Forwarded-Port': '443', 'X-Forwarded-Proto': 'https'},
'multiValueHeaders': {
'Accept': ['*/*'], 'Accept-Encoding': ['gzip, deflate'], 'Authorization': ['Bearer xxx'],
'Host': [current_url.hostname], 'User-Agent': ['python-requests/2.28.2'],
'X-Amzn-Trace-Id': ['Root=1-64a66e90-6fa8b7a64449014639d4f5b4'],
'X-Forwarded-For': ['127.0.0.1'], 'X-Forwarded-Port': ['443'], 'X-Forwarded-Proto': ['https']
},
'queryStringParameters': {},
'multiValueQueryStringParameters': {},
'pathParameters': {},
'stageVariables': None,
'requestContext': {
'resourceId': '',
'authorizer': {'principalId': '', 'integrationLatency': 0},
'resourcePath': actual_path, 'httpMethod': 'PUT',
'extendedRequestId': '', 'requestTime': '',
'path': actual_path, 'accountId': '',
'protocol': 'HTTP/1.1', 'stage': '', 'domainPrefix': '', 'requestTimeEpoch': 0,
'requestId': '',
'identity': {
'cognitoIdentityPoolId': None, 'accountId': None, 'cognitoIdentityId': None, 'caller': None,
'sourceIp': '127.0.0.1', 'principalOrgId': None, 'accessKey': None,
'cognitoAuthenticationType': None,
'cognitoAuthenticationProvider': None, 'userArn': None, 'userAgent': 'python-requests/2.28.2',
'user': None
},
'domainName': current_url.hostname, 'apiId': ''
},
'body': json.dumps(self.__request_body),
'isBase64Encoded': False
}
LOGGER.info(f'actual_event: {actual_event}')
response = AwsLambda().invoke_function(
function_name=self.__collection_cnm_lambda_name,
payload=actual_event,
)
LOGGER.debug(f'async function started: {response}')
return {
'statusCode': 202,
'body': {
'message': 'processing'
}
}

def start(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion cumulus_lambda_functions/uds_api/granules_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ async def get_granules_dapa(request: Request, collection_id: str, limit: Union[i
raise HTTPException(status_code=500, detail=str(e))
if granules_result['statusCode'] == 200:
return granules_result['body']
raise HTTPException(status_code=granules_result['statusCode'], detail=granules_result['body'])
raise HTTPException(status_code=granules_result['statusCode'], detail=granules_result['body'])
2 changes: 1 addition & 1 deletion cumulus_lambda_functions/uds_api/routes_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
main_router = APIRouter(redirect_slashes=False)
# main_router.include_router(setup_es.router)
main_router.include_router(collections_api.router)
main_router.include_router(granules_api.router)
main_router.include_router(granules_api.router)
1 change: 1 addition & 0 deletions docker/stage-in-stage-out/dc-004-catalog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ services:

DELAY_SECOND: '45'
REPEAT_TIMES: '5'
CHUNK_SIZE: '5'

networks:
- internal
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

setup(
name="cumulus_lambda_functions",
version="5.2.3",
version="5.3.1",
packages=find_packages(),
install_requires=install_requires,
tests_require=['mock', 'nose', 'sphinx', 'sphinx_rtd_theme', 'coverage', 'pystac', 'python-dotenv', 'jsonschema'],
Expand Down
5 changes: 2 additions & 3 deletions tests/cumulus_lambda_functions/uds_api/test_uds_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def test_create_new_collection(self):
# TODO check if collection shows up
return

def test_cnm(self):
def test_cnm_facade(self):
os.environ[Constants.USERNAME] = '/unity/uds/user/wphyo/username'
os.environ[Constants.PASSWORD] = '/unity/uds/user/wphyo/dwssap'
os.environ[Constants.PASSWORD_TYPE] = Constants.PARAM_STORE
Expand Down Expand Up @@ -151,6 +151,5 @@ def test_cnm(self):
headers=headers,
json=stac_collection,
)
self.assertEqual(query_result.status_code, 200, f'wrong status code. {query_result.text}')
self.assertEqual(query_result.status_code, 202, f'wrong status code. {query_result.text}')
return

Loading