Skip to content

feat: Checkout stage with STAC catalog json #138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [3.5.0] - 2023-04-18
### Changed
- [#138](https://github.com/unity-sds/unity-data-services/pull/138) feat: Checkout stage with STAC catalog json

## [3.4.0] - 2023-04-17
### Changed
- [#132](https://github.com/unity-sds/unity-data-services/pull/132) feat: add DAAC download logic
Expand Down
34 changes: 34 additions & 0 deletions cumulus_lambda_functions/cumulus_stac/granules_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from pystac import Catalog, Item, Asset

from cumulus_lambda_functions.lib.utils.file_utils import FileUtils


class GranulesCatalog:

def get_child_link_hrefs(self, catalog_file_path: str):
if not FileUtils.file_exist(catalog_file_path):
raise ValueError(f'missing file: {catalog_file_path}')
catalog = FileUtils.read_json(catalog_file_path)
catalog = Catalog.from_dict(catalog)
return [k.href for k in catalog.get_links(rel='child')]

def get_granules_item(self, granule_stac_json) -> Item:
if not FileUtils.file_exist(granule_stac_json):
raise ValueError(f'missing file: {granule_stac_json}')
granules_stac = FileUtils.read_json(granule_stac_json)
granules_stac = Item.from_dict(granules_stac)
return granules_stac

def extract_assets_href(self, granules_stac: Item) -> dict:
assets = {k: v.href for k, v in granules_stac.get_assets().items()}
return assets

def update_assets_href(self, granules_stac: Item, new_assets: dict):
for k, v in new_assets.items():
if k in granules_stac.assets:
existing_asset = granules_stac.assets.get(k)
existing_asset.href = v
else:
existing_asset = Asset(v, k)
granules_stac.add_asset(k, existing_asset)
return self
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import json

from cumulus_lambda_functions.cumulus_stac.granules_catalog import GranulesCatalog
from cumulus_lambda_functions.stage_in_out.search_collections_factory import SearchCollectionsFactory
from cumulus_lambda_functions.stage_in_out.upload_granules_abstract import UploadGranulesAbstract
import logging
import os
import re
from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3

LOGGER = logging.getLogger(__name__)


class UploadGranulesByCatalogS3(UploadGranulesAbstract):
CATALOG_FILE = 'CATALOG_FILE'
COLLECTION_ID_KEY = 'COLLECTION_ID'
STAGING_BUCKET_KEY = 'STAGING_BUCKET'
GRANULES_SEARCH_DOMAIN = 'GRANULES_SEARCH_DOMAIN'

VERIFY_SSL_KEY = 'VERIFY_SSL'
DELETE_FILES_KEY = 'DELETE_FILES'

def __init__(self) -> None:
super().__init__()
self.__gc = GranulesCatalog()
self.__collection_id = ''
self.__collection_details = {}
self.__staging_bucket = ''
self.__verify_ssl = True
self.__delete_files = False
self.__s3 = AwsS3()

def __set_props_from_env(self):
missing_keys = [k for k in [self.CATALOG_FILE, self.COLLECTION_ID_KEY, self.GRANULES_SEARCH_DOMAIN, self.STAGING_BUCKET_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')

self.__collection_id = os.environ.get(self.COLLECTION_ID_KEY)
self.__staging_bucket = os.environ.get(self.STAGING_BUCKET_KEY)

self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
self.__delete_files = os.environ.get(self.DELETE_FILES_KEY, 'FALSE').strip().upper() == 'TRUE'
return self

def upload(self, **kwargs) -> list:
self.__set_props_from_env()
self.__collection_details = SearchCollectionsFactory().get_class(os.getenv('GRANULES_SEARCH_DOMAIN', 'MISSING_GRANULES_SEARCH_DOMAIN')).search()
self.__collection_details = json.loads(self.__collection_details)

granule_id_extraction = self.__collection_details['summaries']['granuleIdExtraction'][0]
child_links = self.__gc.get_child_link_hrefs(os.environ.get(self.CATALOG_FILE))
errors = []
dapa_body_granules = []
for each_child in child_links:
try:
current_granule_stac = self.__gc.get_granules_item(each_child)
current_assets = self.__gc.extract_assets_href(current_granule_stac)
if 'data' not in current_assets:
LOGGER.warning(f'skipping {each_child}. no data in {current_assets}')
continue

current_granule_id = re.findall(granule_id_extraction, os.path.basename(current_assets['data']))
if len(current_granule_id) < 1:
LOGGER.warning(f'skipping {each_child}. cannot be matched to granule_id: {current_granule_id}')
continue
current_granule_id = current_granule_id[0]

updating_assets = {}
uploading_current_granule_stac = None
for asset_type, asset_href in current_assets.items():

LOGGER.debug(f'uploading {asset_type}, {asset_href}')
s3_url = self.__s3.upload(asset_href, self.__staging_bucket, f'{self.__collection_id}:{current_granule_id}', self.__delete_files)
if asset_href == each_child:
uploading_current_granule_stac = s3_url
updating_assets[asset_type] = s3_url
self.__gc.update_assets_href(current_granule_stac, updating_assets)
current_granule_stac.id = current_granule_id
current_granule_stac.collection_id = self.__collection_id
if uploading_current_granule_stac is not None: # upload metadata file again
self.__s3.set_s3_url(uploading_current_granule_stac)
self.__s3.upload_bytes(json.dumps(current_granule_stac.to_dict(False, False)).encode())
dapa_body_granules.append({
'id': f'{self.__collection_id}:{current_granule_id}',
'collection': self.__collection_id,
'assets': {k: v.to_dict() for k, v in current_granule_stac.assets.items()},
})
except Exception as e:
LOGGER.exception(f'error while processing: {each_child}')
errors.append({'href': each_child, 'error': str(e)})

if len(errors) > 0:
LOGGER.error(f'some errors while uploading granules: {errors}')
LOGGER.debug(f'dapa_body_granules: {dapa_body_granules}')
return dapa_body_granules
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@


class UploadGranulesFactory:
S3 = 'S3'
CATALOG_S3 = 'CATALOG_S3'

def get_class(self, upload_type):
if upload_type == UploadGranulesFactory.S3:
from cumulus_lambda_functions.stage_in_out.upload_granules_s3 import UploadGranulesS3
return UploadGranulesS3()
if upload_type == UploadGranulesFactory.CATALOG_S3:
from cumulus_lambda_functions.stage_in_out.upload_granules_by_catalog_s3 import UploadGranulesByCatalogS3
return UploadGranulesByCatalogS3()
raise ValueError(f'unknown search_type: {upload_type}')
5 changes: 3 additions & 2 deletions docker/stage-in-stage-out/dc-003-upload.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ services:
DELETE_FILES: 'FALSE'

GRANULES_SEARCH_DOMAIN: 'UNITY'
GRANULES_UPLOAD_TYPE: 'S3'
UPLOAD_DIR: '/etc/snpp_upload_test_1'
GRANULES_UPLOAD_TYPE: 'S3 or CATALOG_S3'
UPLOAD_DIR: '/etc/snpp_upload_test_1. or empty string'
CATALOG_FILE: 'empty string or /path/to/stac/catalog file'
LOG_LEVEL: '20'
networks:
- internal
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

setup(
name="cumulus_lambda_functions",
version="3.4.0",
version="3.5.0",
packages=find_packages(),
install_requires=install_requires,
tests_require=['mock', 'nose', 'sphinx', 'sphinx_rtd_theme', 'coverage', 'pystac', 'python-dotenv', 'jsonschema'],
Expand Down
Loading