Skip to content

fix: Cataloging large number asynchronously by batch + download is stuck when there are large number of files #194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: use Manager().Q instead of normal Q to fix it for download stuck…
… for large granules
  • Loading branch information
wphyojpl committed Aug 15, 2023
commit 2983ae649ab6b2ce8edd407bf6300f20d430d878
2 changes: 1 addition & 1 deletion ci.cd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export VERSION ?= latest
all: build_lambda upload_lambda update_lambda_function build_docker
local: build_lambda upload_lambda update_lambda_function_1 update_lambda_function_2 update_lambda_function_3
build_docker:
docker build -t "$(IMAGE_PREFIX)/$(NAME):$(VERSION)" -f docker/Dockerfile .
docker build --no-cache -t "$(IMAGE_PREFIX)/$(NAME):$(VERSION)" -f docker/Dockerfile_download_granules.jpl .

zip_docker:
docker save "$(IMAGE_PREFIX)/$(NAME):$(VERSION)" | gzip > "$(NAME)__$(VERSION).tar.gz"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
import multiprocessing
multiprocessing.set_start_method("fork")
from multiprocessing import Process, Queue, Lock, cpu_count
from multiprocessing import Process, Queue, Lock, cpu_count, Manager
from random import randint

from cumulus_lambda_functions.lib.processing_jobs.job_executor_abstract import JobExecutorAbstract
Expand Down Expand Up @@ -142,13 +142,14 @@ def __execute_job(self):
else:
LOGGER.debug(f'executed job: `{job}` ends in Error. putting back to jobs dir')
self.__props.job_manager.put_back_failed_job(job_path)
LOGGER.debug(f'quitting __execute_job')
return

def start(self):
if self.__props.job_executor is None or self.__props.job_manager is None:
raise RuntimeError('missing job_executor or job_manager')
LOGGER.info(f'multithread processing starting with process_count: {self.__props.process_count}')
for i in range(cpu_count()):
for i in range(self.__props.process_count):
p = Process(target=self.__execute_job, args=())
p.daemon = True
self.__props.consumers.append(p)
Expand All @@ -162,4 +163,5 @@ def start(self):
for c in self.__props.consumers: # to check if all consumers are done processing it
LOGGER.info('joining consumers: {}. exit_code: {}'.format(c.pid, c.exitcode))
c.join()
LOGGER.debug('joined')
return
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import os
from abc import ABC, abstractmethod
from multiprocessing import Queue
from multiprocessing import Queue, Manager

from pystac import ItemCollection, Asset, Item

Expand Down Expand Up @@ -40,12 +40,11 @@ def execute_job(self, granule_item, lock) -> bool:
value_dict.href = os.path.join('.', os.path.basename(downloading_url))
new_asset_dict[name] = value_dict
granule_item.assets = new_asset_dict
with lock:
self.__result_list.put(granule_item)
self.__result_list.put(granule_item)
except Exception as e:
LOGGER.exception(f'error downloading granule: {granule_item.id}')
with lock:
self.__error_list.put({'error': str(e), 'id': granule_item.id, })
self.__error_list.put({'error': str(e), 'id': granule_item.id, })
LOGGER.debug(f'done DownloadItemExecutor#execute_job')
return True # always return true?


Expand Down Expand Up @@ -102,8 +101,8 @@ def download(self, **kwargs) -> str:
return json.dumps(granules_json_dict)
# local_items = []
# error_list = []
local_items = Queue()
error_list = Queue()
local_items = Manager().Queue()
error_list = Manager().Queue()
job_manager_props = JobManagerProps()
for each_item in self._granules_json.items:
job_manager_props.memory_job_dict[each_item.id] = each_item
Expand Down
43 changes: 43 additions & 0 deletions tests/integration_tests/test_docker_entry.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import logging

logging.basicConfig(level=10, format="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s")

import json
import os
import tempfile
Expand Down Expand Up @@ -1298,6 +1302,45 @@ def test_02_download__from_file(self):
self.assertTrue(FileUtils.file_exist(os.environ['OUTPUT_FILE']), f'missing output file')
return

def test_02_download__from_file_large(self):
granule_json = FileUtils.read_json('./stage-in.json')
if len(argv) > 1:
argv.pop(-1)
argv.append('DOWNLOAD')
os.environ[Constants.EDL_USERNAME] = '/unity/uds/user/wphyo/edl_username'
os.environ[Constants.EDL_PASSWORD] = '/unity/uds/user/wphyo/edl_dwssap'
os.environ[Constants.EDL_PASSWORD_TYPE] = Constants.PARAM_STORE
os.environ[Constants.EDL_BASE_URL] = 'urs.earthdata.nasa.gov'
os.environ['STAC_JSON'] = json.dumps(granule_json)
os.environ['GRANULES_DOWNLOAD_TYPE'] = 'DAAC'
os.environ['PARALLEL_COUNT'] = '5'

with tempfile.TemporaryDirectory() as tmp_dir_name:
print(tmp_dir_name)
os.environ['OUTPUT_FILE'] = os.path.join(tmp_dir_name, 'some_output', 'output.json')
granule_json_file = os.path.join(tmp_dir_name, 'input_file.json')
downloading_dir = os.path.join(tmp_dir_name, 'downloading_dir')
FileUtils.mk_dir_p(downloading_dir)
FileUtils.write_json(granule_json_file, granule_json)
os.environ['STAC_JSON'] = granule_json_file
os.environ['DOWNLOAD_DIR'] = downloading_dir
download_result_str = choose_process()
download_result = json.loads(download_result_str)
print(len(download_result['features']))
self.assertTrue('features' in download_result, f'missing features in download_result')
self.assertEqual(len(download_result['features']) + 1, len(glob(os.path.join(downloading_dir, '*'))), f'downloaded file does not match')
error_file = os.path.join(downloading_dir, 'error.log')
if FileUtils.file_exist(error_file):
self.assertTrue(False, f'some downloads failed. error.log exists. {FileUtils.read_json(error_file)}')
download_result = download_result['features']
self.assertTrue('assets' in download_result[0], f'no assets in download_result: {download_result}')
downloaded_file_hrefs = set([k['assets']['data']['href'] for k in download_result])
for each_granule in zip(granule_json['features'], download_result):
remote_filename = os.path.basename(each_granule[0]['assets']['data']['href'])
self.assertTrue(os.path.join('.', remote_filename) in downloaded_file_hrefs, f'mismatched: {remote_filename}')
self.assertTrue(FileUtils.file_exist(os.environ['OUTPUT_FILE']), f'missing output file')
return

def test_02_download__from_http(self):
granule_json = '{"numberMatched": 20, "numberReturned": 20, "stac_version": "1.0.0", "type": "FeatureCollection", "links": [{"rel": "self", "href": "https://58nbcawrvb.execute-api.us-west-2.amazonaws.com/test/am-uds-dapa/collections/SNDR_SNPP_ATMS_L1A___1/items?datetime=2016-01-14T08:00:00Z/2016-01-14T11:59:59Z&limit=100&offset=0"}, {"rel": "root", "href": "https://58nbcawrvb.execute-api.us-west-2.amazonaws.com"}, {"rel": "next", "href": "https://58nbcawrvb.execute-api.us-west-2.amazonaws.com/test/am-uds-dapa/collections/SNDR_SNPP_ATMS_L1A___1/items?datetime=2016-01-14T08:00:00Z/2016-01-14T11:59:59Z&limit=100&offset=100"}, {"rel": "prev", "href": "https://58nbcawrvb.execute-api.us-west-2.amazonaws.com/test/am-uds-dapa/collections/SNDR_SNPP_ATMS_L1A___1/items?datetime=2016-01-14T08:00:00Z/2016-01-14T11:59:59Z&limit=100&offset=0"}], "features": [{"type": "Feature", "stac_version": "1.0.0", "id": "SNDR.SNPP.ATMS.L1A.nominal2.01", "properties": {"start_datetime": "2016-01-14T09:54:00Z", "end_datetime": "2016-01-14T10:00:00Z", "created": "2020-12-14T13:50:00Z", "updated": "2022-08-15T06:26:39.830000Z", "datetime": "2022-08-15T06:26:37.029000Z"}, "geometry": {"type": "Point", "coordinates": [0.0, 0.0]}, "links": [{"rel": "collection", "href": "."}], "assets": {"data": {"href": "https://raw.githubusercontent.com/unity-sds/unity-data-services/develop/README.md", "title": "SNDR.SNPP.ATMS.L1A.nominal2.01.nc", "description": "SNDR.SNPP.ATMS.L1A.nominal2.01.nc"}, "metadata__data": {"href": "s3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.01.nc.cas", "title": "SNDR.SNPP.ATMS.L1A.nominal2.01.nc.cas", "description": "SNDR.SNPP.ATMS.L1A.nominal2.01.nc.cas"}, "metadata__cmr": {"href": "s3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.01.cmr.xml", "title": "SNDR.SNPP.ATMS.L1A.nominal2.01.cmr.xml", "description": "SNDR.SNPP.ATMS.L1A.nominal2.01.cmr.xml"}}, "bbox": [0.0, 0.0, 0.0, 0.0], "stac_extensions": [], "collection": "SNDR_SNPP_ATMS_L1A___1"}, {"type": "Feature", "stac_version": "1.0.0", "id": "SNDR.SNPP.ATMS.L1A.nominal2.08", "properties": {"start_datetime": "2016-01-14T10:36:00Z", "end_datetime": "2016-01-14T10:42:00Z", "created": "2020-12-14T13:50:00Z", "updated": "2022-08-15T06:26:26.078000Z", "datetime": "2022-08-15T06:26:19.333000Z"}, "geometry": {"type": "Point", "coordinates": [0.0, 0.0]}, "links": [{"rel": "collection", "href": "."}], "assets": {"data": {"href": "https://raw.githubusercontent.com/unity-sds/unity-data-services/develop/CHANGELOG.md", "title": "SNDR.SNPP.ATMS.L1A.nominal2.08.nc", "description": "SNDR.SNPP.ATMS.L1A.nominal2.08.nc"}, "metadata__data": {"href": "s3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.08.nc.cas", "title": "SNDR.SNPP.ATMS.L1A.nominal2.08.nc.cas", "description": "SNDR.SNPP.ATMS.L1A.nominal2.08.nc.cas"}, "metadata__cmr": {"href": "s3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.08.cmr.xml", "title": "SNDR.SNPP.ATMS.L1A.nominal2.08.cmr.xml", "description": "SNDR.SNPP.ATMS.L1A.nominal2.08.cmr.xml"}}, "bbox": [0.0, 0.0, 0.0, 0.0], "stac_extensions": [], "collection": "SNDR_SNPP_ATMS_L1A___1"}, {"type": "Feature", "stac_version": "1.0.0", "id": "SNDR.SNPP.ATMS.L1A.nominal2.06", "properties": {"start_datetime": "2016-01-14T10:24:00Z", "end_datetime": "2016-01-14T10:30:00Z", "created": "2020-12-14T13:50:00Z", "updated": "2022-08-15T06:26:26.068000Z", "datetime": "2022-08-15T06:26:18.641000Z"}, "geometry": {"type": "Point", "coordinates": [0.0, 0.0]}, "links": [{"rel": "collection", "href": "."}], "assets": {"data": {"href": "https://raw.githubusercontent.com/unity-sds/unity-data-services/develop/CODE_OF_CONDUCT.md", "title": "SNDR.SNPP.ATMS.L1A.nominal2.06.nc", "description": "SNDR.SNPP.ATMS.L1A.nominal2.06.nc"}, "metadata__data": {"href": "s3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.06.nc.cas", "title": "SNDR.SNPP.ATMS.L1A.nominal2.06.nc.cas", "description": "SNDR.SNPP.ATMS.L1A.nominal2.06.nc.cas"}, "metadata__cmr": {"href": "s3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.06.cmr.xml", "title": "SNDR.SNPP.ATMS.L1A.nominal2.06.cmr.xml", "description": "SNDR.SNPP.ATMS.L1A.nominal2.06.cmr.xml"}}, "bbox": [0.0, 0.0, 0.0, 0.0], "stac_extensions": [], "collection": "SNDR_SNPP_ATMS_L1A___1"}, {"type": "Feature", "stac_version": "1.0.0", "id": "SNDR.SNPP.ATMS.L1A.nominal2.18", "properties": {"start_datetime": "2016-01-14T11:36:00Z", "end_datetime": "2016-01-14T11:42:00Z", "created": "2020-12-14T13:50:00Z", "updated": "2022-08-15T06:26:26.060000Z", "datetime": "2022-08-15T06:26:19.698000Z"}, "geometry": {"type": "Point", "coordinates": [0.0, 0.0]}, "links": [{"rel": "collection", "href": "."}], "assets": {"data": {"href": "https://raw.githubusercontent.com/unity-sds/unity-data-services/develop/CONTRIBUTING.md", "title": "SNDR.SNPP.ATMS.L1A.nominal2.18.nc", "description": "SNDR.SNPP.ATMS.L1A.nominal2.18.nc"}, "metadata__data": {"href": "s3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.18.nc.cas", "title": "SNDR.SNPP.ATMS.L1A.nominal2.18.nc.cas", "description": "SNDR.SNPP.ATMS.L1A.nominal2.18.nc.cas"}, "metadata__cmr": {"href": "s3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.18.cmr.xml", "title": "SNDR.SNPP.ATMS.L1A.nominal2.18.cmr.xml", "description": "SNDR.SNPP.ATMS.L1A.nominal2.18.cmr.xml"}}, "bbox": [0.0, 0.0, 0.0, 0.0], "stac_extensions": [], "collection": "SNDR_SNPP_ATMS_L1A___1"}, {"type": "Feature", "stac_version": "1.0.0", "id": "SNDR.SNPP.ATMS.L1A.nominal2.04", "properties": {"start_datetime": "2016-01-14T10:12:00Z", "end_datetime": "2016-01-14T10:18:00Z", "created": "2020-12-14T13:50:00Z", "updated": "2022-08-15T06:26:26.050000Z", "datetime": "2022-08-15T06:26:19.491000Z"}, "geometry": {"type": "Point", "coordinates": [0.0, 0.0]}, "links": [{"rel": "collection", "href": "."}], "assets": {"data": {"href": "https://raw.githubusercontent.com/unity-sds/unity-data-services/develop/LICENSE", "title": "SNDR.SNPP.ATMS.L1A.nominal2.04.nc", "description": "SNDR.SNPP.ATMS.L1A.nominal2.04.nc"}, "metadata__data": {"href": "s3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.04.nc.cas", "title": "SNDR.SNPP.ATMS.L1A.nominal2.04.nc.cas", "description": "SNDR.SNPP.ATMS.L1A.nominal2.04.nc.cas"}, "metadata__cmr": {"href": "s3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.04.cmr.xml", "title": "SNDR.SNPP.ATMS.L1A.nominal2.04.cmr.xml", "description": "SNDR.SNPP.ATMS.L1A.nominal2.04.cmr.xml"}}, "bbox": [0.0, 0.0, 0.0, 0.0], "stac_extensions": [], "collection": "SNDR_SNPP_ATMS_L1A___1"}]}'
granule_json = json.loads(granule_json)
Expand Down