Skip to content

release/3.6.0 #143

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [3.6.0] - 2023-04-24
### Added
- [#142](https://github.com/unity-sds/unity-data-services/pull/142) feat: Support DAAC download files stac file, not just direct json text

## [3.5.0] - 2023-04-18
### Added
- [#138](https://github.com/unity-sds/unity-data-services/pull/138) feat: Checkout stage with STAC catalog json

## [3.4.0] - 2023-04-17
### Changed
### Added
- [#132](https://github.com/unity-sds/unity-data-services/pull/132) feat: add DAAC download logic

## [3.3.1] - 2023-04-13
Expand Down
34 changes: 34 additions & 0 deletions cumulus_lambda_functions/cumulus_stac/granules_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from pystac import Catalog, Item, Asset

from cumulus_lambda_functions.lib.utils.file_utils import FileUtils


class GranulesCatalog:

def get_child_link_hrefs(self, catalog_file_path: str):
if not FileUtils.file_exist(catalog_file_path):
raise ValueError(f'missing file: {catalog_file_path}')
catalog = FileUtils.read_json(catalog_file_path)
catalog = Catalog.from_dict(catalog)
return [k.href for k in catalog.get_links(rel='child')]

def get_granules_item(self, granule_stac_json) -> Item:
if not FileUtils.file_exist(granule_stac_json):
raise ValueError(f'missing file: {granule_stac_json}')
granules_stac = FileUtils.read_json(granule_stac_json)
granules_stac = Item.from_dict(granules_stac)
return granules_stac

def extract_assets_href(self, granules_stac: Item) -> dict:
assets = {k: v.href for k, v in granules_stac.get_assets().items()}
return assets

def update_assets_href(self, granules_stac: Item, new_assets: dict):
for k, v in new_assets.items():
if k in granules_stac.assets:
existing_asset = granules_stac.assets.get(k)
existing_asset.href = v
else:
existing_asset = Asset(v, k)
granules_stac.add_asset(k, existing_asset)
return self
Original file line number Diff line number Diff line change
@@ -1,7 +1,43 @@
import json
import logging
import os
from abc import ABC, abstractmethod

from cumulus_lambda_functions.lib.utils.file_utils import FileUtils

LOGGER = logging.getLogger(__name__)


class DownloadGranulesAbstract(ABC):
STAC_JSON = 'STAC_JSON'
DOWNLOAD_DIR_KEY = 'DOWNLOAD_DIR'

def __init__(self) -> None:
super().__init__()
self._granules_json = []
self._download_dir = '/tmp'

def _setup_download_dir(self):
self._download_dir = os.environ.get(self.DOWNLOAD_DIR_KEY)
self._download_dir = self._download_dir[:-1] if self._download_dir.endswith('/') else self._download_dir
LOGGER.debug(f'creating download dir: {self._download_dir}')
FileUtils.mk_dir_p(self._download_dir)
return self

def _retrieve_stac_json(self):
raw_stac_json = os.environ.get(self.STAC_JSON)
try:
self._granules_json = json.loads(raw_stac_json)
return self
except:
LOGGER.debug(f'raw_stac_json is not STAC_JSON: {raw_stac_json}. trying to see if file exists')
if not FileUtils.file_exist(raw_stac_json):
raise ValueError(f'missing file or not JSON: {raw_stac_json}')
self._granules_json = FileUtils.read_json(raw_stac_json)
if self._granules_json is None:
raise ValueError(f'{raw_stac_json} is not JSON')
return self

@abstractmethod
def download(self, **kwargs) -> list:
raise NotImplementedError()
22 changes: 6 additions & 16 deletions cumulus_lambda_functions/stage_in_out/download_granules_daac.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,21 @@
import logging
import os

from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils

LOGGER = logging.getLogger(__name__)


class DownloadGranulesDAAC(DownloadGranulesAbstract):
DOWNLOAD_DIR_KEY = 'DOWNLOAD_DIR'
STAC_JSON = 'STAC_JSON'

def __init__(self) -> None:
super().__init__()
self.__download_dir = '/tmp'
self.__s3 = AwsS3()
self.__granules_json = []
self.__edl_token = None

def __set_props_from_env(self):
missing_keys = [k for k in [self.STAC_JSON, self.DOWNLOAD_DIR_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')
self.__granules_json = json.loads(os.environ.get(self.STAC_JSON))
self.__download_dir = os.environ.get(self.DOWNLOAD_DIR_KEY)
self.__download_dir = self.__download_dir[:-1] if self.__download_dir.endswith('/') else self.__download_dir
self._retrieve_stac_json()
self._setup_download_dir()
self.__edl_token = URSTokenRetriever().start()
return self

Expand Down Expand Up @@ -79,7 +70,7 @@ def __download_one_granule(self, assets: dict):
if r.status_code >= 400:
raise RuntimeError(f'wrong response status: {r.status_code}. details: {r.content}')
# TODO. how to correctly check redirecting to login page
with open(os.path.join(self.__download_dir, os.path.basename(v["href"])), 'wb') as fd:
with open(os.path.join(self._download_dir, os.path.basename(v["href"])), 'wb') as fd:
fd.write(r.content)
except Exception as e:
LOGGER.exception(f'failed to download {v}')
Expand All @@ -89,15 +80,14 @@ def __download_one_granule(self, assets: dict):

def download(self, **kwargs) -> list:
self.__set_props_from_env()
LOGGER.debug(f'creating download dir: {self.__download_dir}')
FileUtils.mk_dir_p(self.__download_dir)
downloading_urls = self.__get_downloading_urls(self.__granules_json)
LOGGER.debug(f'creating download dir: {self._download_dir}')
downloading_urls = self.__get_downloading_urls(self._granules_json)
error_list = []
for each in downloading_urls:
LOGGER.debug(f'working on {each}')
current_error_list = self.__download_one_granule(each)
error_list.extend(current_error_list)
if len(error_list) > 0:
with open(f'{self.__download_dir}/error.log', 'w') as error_file:
with open(f'{self._download_dir}/error.log', 'w') as error_file:
error_file.write(json.dumps(error_list, indent=4))
return downloading_urls
32 changes: 5 additions & 27 deletions cumulus_lambda_functions/stage_in_out/download_granules_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,22 @@
import os

from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils

LOGGER = logging.getLogger(__name__)


class DownloadGranulesS3(DownloadGranulesAbstract):
DOWNLOAD_DIR_KEY = 'DOWNLOAD_DIR'
STAC_JSON = 'STAC_JSON'

def __init__(self) -> None:
super().__init__()
self.__download_dir = '/tmp'
self.__s3 = AwsS3()
self.__granules_json = []

def __retrieve_stac_json(self):
raw_stac_json = os.environ.get(self.STAC_JSON)
try:
self.__granules_json = json.loads(raw_stac_json)
return self
except:
LOGGER.debug(f'raw_stac_json is not STAC_JSON: {raw_stac_json}. trying to see if file exists')
if not FileUtils.file_exist(raw_stac_json):
raise ValueError(f'missing file or not JSON: {raw_stac_json}')
self.__granules_json = FileUtils.read_json(raw_stac_json)
if self.__granules_json is None:
raise ValueError(f'{raw_stac_json} is not JSON')
return self

def __set_props_from_env(self):
missing_keys = [k for k in [self.STAC_JSON, self.DOWNLOAD_DIR_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')
self.__retrieve_stac_json()
self.__download_dir = os.environ.get(self.DOWNLOAD_DIR_KEY)
self.__download_dir = self.__download_dir[:-1] if self.__download_dir.endswith('/') else self.__download_dir
self._retrieve_stac_json()
self._setup_download_dir()
return self

def __get_downloading_urls(self, granules_result: list):
Expand Down Expand Up @@ -81,7 +61,7 @@ def __download_one_granule(self, assets: dict):
for k, v in assets.items():
try:
LOGGER.debug(f'downloading: {v["href"]}')
self.__s3.set_s3_url(v['href']).download(self.__download_dir)
self.__s3.set_s3_url(v['href']).download(self._download_dir)
except Exception as e:
LOGGER.exception(f'failed to download {v}')
v['cause'] = str(e)
Expand All @@ -90,15 +70,13 @@ def __download_one_granule(self, assets: dict):

def download(self, **kwargs) -> list:
self.__set_props_from_env()
LOGGER.debug(f'creating download dir: {self.__download_dir}')
FileUtils.mk_dir_p(self.__download_dir)
downloading_urls = self.__get_downloading_urls(self.__granules_json)
downloading_urls = self.__get_downloading_urls(self._granules_json)
error_list = []
for each in downloading_urls:
LOGGER.debug(f'working on {each}')
current_error_list = self.__download_one_granule(each)
error_list.extend(current_error_list)
if len(error_list) > 0:
with open(f'{self.__download_dir}/error.log', 'w') as error_file:
with open(f'{self._download_dir}/error.log', 'w') as error_file:
error_file.write(json.dumps(error_list, indent=4))
return downloading_urls
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import json

from cumulus_lambda_functions.cumulus_stac.granules_catalog import GranulesCatalog
from cumulus_lambda_functions.stage_in_out.search_collections_factory import SearchCollectionsFactory
from cumulus_lambda_functions.stage_in_out.upload_granules_abstract import UploadGranulesAbstract
import logging
import os
import re
from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3

LOGGER = logging.getLogger(__name__)


class UploadGranulesByCatalogS3(UploadGranulesAbstract):
CATALOG_FILE = 'CATALOG_FILE'
COLLECTION_ID_KEY = 'COLLECTION_ID'
STAGING_BUCKET_KEY = 'STAGING_BUCKET'
GRANULES_SEARCH_DOMAIN = 'GRANULES_SEARCH_DOMAIN'

VERIFY_SSL_KEY = 'VERIFY_SSL'
DELETE_FILES_KEY = 'DELETE_FILES'

def __init__(self) -> None:
super().__init__()
self.__gc = GranulesCatalog()
self.__collection_id = ''
self.__collection_details = {}
self.__staging_bucket = ''
self.__verify_ssl = True
self.__delete_files = False
self.__s3 = AwsS3()

def __set_props_from_env(self):
missing_keys = [k for k in [self.CATALOG_FILE, self.COLLECTION_ID_KEY, self.GRANULES_SEARCH_DOMAIN, self.STAGING_BUCKET_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')

self.__collection_id = os.environ.get(self.COLLECTION_ID_KEY)
self.__staging_bucket = os.environ.get(self.STAGING_BUCKET_KEY)

self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
self.__delete_files = os.environ.get(self.DELETE_FILES_KEY, 'FALSE').strip().upper() == 'TRUE'
return self

def upload(self, **kwargs) -> list:
self.__set_props_from_env()
self.__collection_details = SearchCollectionsFactory().get_class(os.getenv('GRANULES_SEARCH_DOMAIN', 'MISSING_GRANULES_SEARCH_DOMAIN')).search()
self.__collection_details = json.loads(self.__collection_details)

granule_id_extraction = self.__collection_details['summaries']['granuleIdExtraction'][0]
child_links = self.__gc.get_child_link_hrefs(os.environ.get(self.CATALOG_FILE))
errors = []
dapa_body_granules = []
for each_child in child_links:
try:
current_granule_stac = self.__gc.get_granules_item(each_child)
current_assets = self.__gc.extract_assets_href(current_granule_stac)
if 'data' not in current_assets:
LOGGER.warning(f'skipping {each_child}. no data in {current_assets}')
continue

current_granule_id = re.findall(granule_id_extraction, os.path.basename(current_assets['data']))
if len(current_granule_id) < 1:
LOGGER.warning(f'skipping {each_child}. cannot be matched to granule_id: {current_granule_id}')
continue
current_granule_id = current_granule_id[0]

updating_assets = {}
uploading_current_granule_stac = None
for asset_type, asset_href in current_assets.items():

LOGGER.debug(f'uploading {asset_type}, {asset_href}')
s3_url = self.__s3.upload(asset_href, self.__staging_bucket, f'{self.__collection_id}:{current_granule_id}', self.__delete_files)
if asset_href == each_child:
uploading_current_granule_stac = s3_url
updating_assets[asset_type] = s3_url
self.__gc.update_assets_href(current_granule_stac, updating_assets)
current_granule_stac.id = current_granule_id
current_granule_stac.collection_id = self.__collection_id
if uploading_current_granule_stac is not None: # upload metadata file again
self.__s3.set_s3_url(uploading_current_granule_stac)
self.__s3.upload_bytes(json.dumps(current_granule_stac.to_dict(False, False)).encode())
dapa_body_granules.append({
'id': f'{self.__collection_id}:{current_granule_id}',
'collection': self.__collection_id,
'assets': {k: v.to_dict() for k, v in current_granule_stac.assets.items()},
})
except Exception as e:
LOGGER.exception(f'error while processing: {each_child}')
errors.append({'href': each_child, 'error': str(e)})

if len(errors) > 0:
LOGGER.error(f'some errors while uploading granules: {errors}')
LOGGER.debug(f'dapa_body_granules: {dapa_body_granules}')
return dapa_body_granules
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@


class UploadGranulesFactory:
S3 = 'S3'
CATALOG_S3 = 'CATALOG_S3'

def get_class(self, upload_type):
if upload_type == UploadGranulesFactory.S3:
from cumulus_lambda_functions.stage_in_out.upload_granules_s3 import UploadGranulesS3
return UploadGranulesS3()
if upload_type == UploadGranulesFactory.CATALOG_S3:
from cumulus_lambda_functions.stage_in_out.upload_granules_by_catalog_s3 import UploadGranulesByCatalogS3
return UploadGranulesByCatalogS3()
raise ValueError(f'unknown search_type: {upload_type}')
5 changes: 3 additions & 2 deletions docker/stage-in-stage-out/dc-003-upload.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ services:
DELETE_FILES: 'FALSE'

GRANULES_SEARCH_DOMAIN: 'UNITY'
GRANULES_UPLOAD_TYPE: 'S3'
UPLOAD_DIR: '/etc/snpp_upload_test_1'
GRANULES_UPLOAD_TYPE: 'S3 or CATALOG_S3'
UPLOAD_DIR: '/etc/snpp_upload_test_1. or empty string'
CATALOG_FILE: 'empty string or /path/to/stac/catalog file'
LOG_LEVEL: '20'
networks:
- internal
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

setup(
name="cumulus_lambda_functions",
version="3.4.0",
version="3.6.0",
packages=find_packages(),
install_requires=install_requires,
tests_require=['mock', 'nose', 'sphinx', 'sphinx_rtd_theme', 'coverage', 'pystac', 'python-dotenv', 'jsonschema'],
Expand Down
Loading