|
| 1 | +import json |
| 2 | + |
| 3 | +from cumulus_lambda_functions.cumulus_stac.granules_catalog import GranulesCatalog |
| 4 | +from cumulus_lambda_functions.stage_in_out.search_collections_factory import SearchCollectionsFactory |
| 5 | +from cumulus_lambda_functions.stage_in_out.upload_granules_abstract import UploadGranulesAbstract |
| 6 | +import logging |
| 7 | +import os |
| 8 | +import re |
| 9 | +from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3 |
| 10 | + |
| 11 | +LOGGER = logging.getLogger(__name__) |
| 12 | + |
| 13 | + |
| 14 | +class UploadGranulesByCatalogS3(UploadGranulesAbstract): |
| 15 | + CATALOG_FILE = 'CATALOG_FILE' |
| 16 | + COLLECTION_ID_KEY = 'COLLECTION_ID' |
| 17 | + STAGING_BUCKET_KEY = 'STAGING_BUCKET' |
| 18 | + GRANULES_SEARCH_DOMAIN = 'GRANULES_SEARCH_DOMAIN' |
| 19 | + |
| 20 | + VERIFY_SSL_KEY = 'VERIFY_SSL' |
| 21 | + DELETE_FILES_KEY = 'DELETE_FILES' |
| 22 | + |
| 23 | + def __init__(self) -> None: |
| 24 | + super().__init__() |
| 25 | + self.__gc = GranulesCatalog() |
| 26 | + self.__collection_id = '' |
| 27 | + self.__collection_details = {} |
| 28 | + self.__staging_bucket = '' |
| 29 | + self.__verify_ssl = True |
| 30 | + self.__delete_files = False |
| 31 | + self.__s3 = AwsS3() |
| 32 | + |
| 33 | + def __set_props_from_env(self): |
| 34 | + missing_keys = [k for k in [self.CATALOG_FILE, self.COLLECTION_ID_KEY, self.GRANULES_SEARCH_DOMAIN, self.STAGING_BUCKET_KEY] if k not in os.environ] |
| 35 | + if len(missing_keys) > 0: |
| 36 | + raise ValueError(f'missing environment keys: {missing_keys}') |
| 37 | + |
| 38 | + self.__collection_id = os.environ.get(self.COLLECTION_ID_KEY) |
| 39 | + self.__staging_bucket = os.environ.get(self.STAGING_BUCKET_KEY) |
| 40 | + |
| 41 | + self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE' |
| 42 | + self.__delete_files = os.environ.get(self.DELETE_FILES_KEY, 'FALSE').strip().upper() == 'TRUE' |
| 43 | + return self |
| 44 | + |
| 45 | + def upload(self, **kwargs) -> list: |
| 46 | + self.__set_props_from_env() |
| 47 | + self.__collection_details = SearchCollectionsFactory().get_class(os.getenv('GRANULES_SEARCH_DOMAIN', 'MISSING_GRANULES_SEARCH_DOMAIN')).search() |
| 48 | + self.__collection_details = json.loads(self.__collection_details) |
| 49 | + |
| 50 | + granule_id_extraction = self.__collection_details['summaries']['granuleIdExtraction'][0] |
| 51 | + child_links = self.__gc.get_child_link_hrefs(os.environ.get(self.CATALOG_FILE)) |
| 52 | + errors = [] |
| 53 | + dapa_body_granules = [] |
| 54 | + for each_child in child_links: |
| 55 | + try: |
| 56 | + current_granule_stac = self.__gc.get_granules_item(each_child) |
| 57 | + current_assets = self.__gc.extract_assets_href(current_granule_stac) |
| 58 | + if 'data' not in current_assets: |
| 59 | + LOGGER.warning(f'skipping {each_child}. no data in {current_assets}') |
| 60 | + continue |
| 61 | + |
| 62 | + current_granule_id = re.findall(granule_id_extraction, os.path.basename(current_assets['data'])) |
| 63 | + if len(current_granule_id) < 1: |
| 64 | + LOGGER.warning(f'skipping {each_child}. cannot be matched to granule_id: {current_granule_id}') |
| 65 | + continue |
| 66 | + current_granule_id = current_granule_id[0] |
| 67 | + |
| 68 | + updating_assets = {} |
| 69 | + uploading_current_granule_stac = None |
| 70 | + for asset_type, asset_href in current_assets.items(): |
| 71 | + |
| 72 | + LOGGER.debug(f'uploading {asset_type}, {asset_href}') |
| 73 | + s3_url = self.__s3.upload(asset_href, self.__staging_bucket, f'{self.__collection_id}:{current_granule_id}', self.__delete_files) |
| 74 | + if asset_href == each_child: |
| 75 | + uploading_current_granule_stac = s3_url |
| 76 | + updating_assets[asset_type] = s3_url |
| 77 | + self.__gc.update_assets_href(current_granule_stac, updating_assets) |
| 78 | + current_granule_stac.id = current_granule_id |
| 79 | + current_granule_stac.collection_id = self.__collection_id |
| 80 | + if uploading_current_granule_stac is not None: # upload metadata file again |
| 81 | + self.__s3.set_s3_url(uploading_current_granule_stac) |
| 82 | + self.__s3.upload_bytes(json.dumps(current_granule_stac.to_dict(False, False)).encode()) |
| 83 | + dapa_body_granules.append({ |
| 84 | + 'id': f'{self.__collection_id}:{current_granule_id}', |
| 85 | + 'collection': self.__collection_id, |
| 86 | + 'assets': {k: v.to_dict() for k, v in current_granule_stac.assets.items()}, |
| 87 | + }) |
| 88 | + except Exception as e: |
| 89 | + LOGGER.exception(f'error while processing: {each_child}') |
| 90 | + errors.append({'href': each_child, 'error': str(e)}) |
| 91 | + |
| 92 | + if len(errors) > 0: |
| 93 | + LOGGER.error(f'some errors while uploading granules: {errors}') |
| 94 | + LOGGER.debug(f'dapa_body_granules: {dapa_body_granules}') |
| 95 | + return dapa_body_granules |
0 commit comments