Skip to content

breaking: split stage in stage out #120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.DS_Store
.idea
.eggs
build
scratch*
local*
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.0.0] - 2023-01-23
### Breaking
- [#120](https://github.com/unity-sds/unity-data-services/pull/120) breakup upload and download dockers into search + download & upload + catalog

## [1.10.1] - 2023-01-23
### Fixed
- [#112](https://github.com/unity-sds/unity-data-services/pull/112) update dockerfile base images
Expand Down

This file was deleted.

Empty file.
24 changes: 15 additions & 9 deletions cumulus_lambda_functions/docker_entrypoint/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,29 @@
import os
from sys import argv

from cumulus_lambda_functions.cumulus_download_granules.download_granules import DownloadGranules
from cumulus_lambda_functions.cumulus_upload_granules.upload_granules import UploadGranules
from cumulus_lambda_functions.stage_in_out.catalog_granules_factory import CatalogGranulesFactory
from cumulus_lambda_functions.stage_in_out.download_granules_s3 import DownloadGranulesS3
from cumulus_lambda_functions.stage_in_out.search_granules_factory import SearchGranulesFactory
from cumulus_lambda_functions.stage_in_out.upoad_granules_factory import UploadGranulesFactory


def choose_process():
if argv[1].strip().upper() == 'SEARCH':
logging.info('starting SEARCH script')
return SearchGranulesFactory().get_class(os.getenv('GRANULES_SEARCH_DOMAIN', 'MISSING_GRANULES_SEARCH_DOMAIN')).search()
if argv[1].strip().upper() == 'DOWNLOAD':
logging.info('starting DOWNLOAD script')
DownloadGranules().start()
elif argv[1].strip().upper() == 'UPLOAD':
return DownloadGranulesS3().download()
if argv[1].strip().upper() == 'UPLOAD':
logging.info('starting UPLOAD script')
logging.info(UploadGranules().start())
else:
raise ValueError(f'invalid argument: {argv}')
return
return UploadGranulesFactory().get_class(os.getenv('GRANULES_UPLOAD_TYPE', 'MISSING_GRANULES_UPLOAD_TYPE')).upload()
if argv[1].strip().upper() == 'CATALOG':
logging.info('starting CATALOG script')
return CatalogGranulesFactory().get_class(os.getenv('GRANULES_CATALOG_TYPE', 'MISSING_GRANULES_CATALOG_TYPE')).catalog()
raise ValueError(f'invalid argument: {argv}')


if __name__ == '__main__':
logging.basicConfig(level=int(os.environ.get('LOG_LEVEL', '10')),
format="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s")
choose_process()
print(choose_process())
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from abc import ABC, abstractmethod


class CatalogGranulesAbstract(ABC):
@abstractmethod
def catalog(self, **kwargs):
raise NotImplementedError()
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class CatalogGranulesFactory:
UNITY = 'UNITY'

def get_class(self, upload_type):
if upload_type == CatalogGranulesFactory.UNITY:
from cumulus_lambda_functions.stage_in_out.catalog_granules_unity import CatalogGranulesUnity
return CatalogGranulesUnity()
raise ValueError(f'unknown search_type: {upload_type}')
39 changes: 39 additions & 0 deletions cumulus_lambda_functions/stage_in_out/catalog_granules_unity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from cumulus_lambda_functions.cumulus_dapa_client.dapa_client import DapaClient
from cumulus_lambda_functions.stage_in_out.catalog_granules_abstract import CatalogGranulesAbstract
import json
import logging
import os

LOGGER = logging.getLogger(__name__)


class CatalogGranulesUnity(CatalogGranulesAbstract):
PROVIDER_ID_KEY = 'PROVIDER_ID'
UPLOADED_FILES_JSON = 'UPLOADED_FILES_JSON'
VERIFY_SSL_KEY = 'VERIFY_SSL'

def __init__(self) -> None:
super().__init__()
self.__provider_id = ''
self.__uploaded_files_json = None
self.__verify_ssl = True

def __set_props_from_env(self):
missing_keys = [k for k in [self.UPLOADED_FILES_JSON, self.PROVIDER_ID_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')
self.__provider_id = os.environ.get(self.PROVIDER_ID_KEY)
self.__uploaded_files_json = json.loads(os.environ.get(self.UPLOADED_FILES_JSON))
self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
return self

def catalog(self, **kwargs):
self.__set_props_from_env()
dapa_body = {
"provider_id": self.__provider_id,
"features": self.__uploaded_files_json
}
dapa_client = DapaClient().with_verify_ssl(self.__verify_ssl)
LOGGER.debug(f'dapa_body_granules: {dapa_body}')
dapa_ingest_result = dapa_client.ingest_granules_w_cnm(dapa_body)
return dapa_ingest_result
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from abc import ABC, abstractmethod


class DownloadGranulesAbstract(ABC):
@abstractmethod
def download(self, **kwargs) -> list:
raise NotImplementedError()
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class DownloadGranulesFactory:
S3 = 'S3'

def get_class(self, search_type):
if search_type == DownloadGranulesFactory.S3:
from cumulus_lambda_functions.stage_in_out.download_granules_s3 import DownloadGranulesS3
return DownloadGranulesS3()
raise ValueError(f'unknown search_type: {search_type}')
Original file line number Diff line number Diff line change
@@ -1,48 +1,31 @@
from cumulus_lambda_functions.stage_in_out.download_granules_abstract import DownloadGranulesAbstract
import json
import logging
import os

from cumulus_lambda_functions.cumulus_dapa_client.dapa_client import DapaClient
from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils

LOGGER = logging.getLogger(__name__)


class DownloadGranules:
COLLECTION_ID_KEY = 'COLLECTION_ID'
class DownloadGranulesS3(DownloadGranulesAbstract):
DOWNLOAD_DIR_KEY = 'DOWNLOAD_DIR'
STAC_JSON = 'STAC_JSON'

LIMITS_KEY = 'LIMITS'
DATE_FROM_KEY = 'DATE_FROM'
DATE_TO_KEY = 'DATE_TO'
VERIFY_SSL_KEY = 'VERIFY_SSL'

def __init__(self):
self.__collection_id = ''
self.__date_from = ''
self.__date_to = ''
self.__limit = 1000
def __init__(self) -> None:
super().__init__()
self.__download_dir = '/tmp'
self.__verify_ssl = True
self.__s3 = AwsS3()
self.__granules_json = []

def __set_props_from_env(self):
missing_keys = [k for k in [self.COLLECTION_ID_KEY, self.DOWNLOAD_DIR_KEY] if k not in os.environ]
missing_keys = [k for k in [self.STAC_JSON, self.DOWNLOAD_DIR_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')

self.__collection_id = os.environ.get(self.COLLECTION_ID_KEY)
self.__granules_json = json.loads(os.environ.get(self.STAC_JSON))
self.__download_dir = os.environ.get(self.DOWNLOAD_DIR_KEY)
self.__download_dir = self.__download_dir[:-1] if self.__download_dir.endswith('/') else self.__download_dir
if self.LIMITS_KEY not in os.environ:
LOGGER.warning(f'missing {self.LIMITS_KEY}. using default: {self.__limit}')
else:
self.__limit = int(os.environ.get(self.LIMITS_KEY))

self.__date_from = os.environ.get(self.DATE_FROM_KEY, '')
self.__date_to = os.environ.get(self.DATE_TO_KEY, '')
self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
return self

def __get_downloading_urls(self, granules_result: list):
Expand Down Expand Up @@ -91,13 +74,11 @@ def __download_one_granule(self, assets: dict):
error_log.append(v)
return error_log

def start(self):
def download(self, **kwargs) -> list:
self.__set_props_from_env()
LOGGER.debug(f'creating download dir: {self.__download_dir}')
FileUtils.mk_dir_p(self.__download_dir)
dapa_client = DapaClient().with_verify_ssl(self.__verify_ssl)
granules_result = dapa_client.get_granules(self.__collection_id, self.__limit, 0, self.__date_from, self.__date_to)
downloading_urls = self.__get_downloading_urls(granules_result)
downloading_urls = self.__get_downloading_urls(self.__granules_json)
error_list = []
for each in downloading_urls:
LOGGER.debug(f'working on {each}')
Expand All @@ -106,4 +87,4 @@ def start(self):
if len(error_list) > 0:
with open(f'{self.__download_dir}/error.log', 'w') as error_file:
error_file.write(json.dumps(error_list, indent=4))
return
return downloading_urls
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from abc import ABC, abstractmethod


class SearchCollectionsAbstract(ABC):
@abstractmethod
def search(self, **kwargs):
raise NotImplementedError()
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from cumulus_lambda_functions.stage_in_out.search_collections_abstract import SearchCollectionsAbstract


class SearchCollectionsCmr(SearchCollectionsAbstract):
def search(self, **kwargs):
raise NotImplementedError()
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@


class SearchCollectionsFactory:
CMR = 'CMR'
UNITY = 'UNITY'
def get_class(self, search_type):
if search_type == SearchCollectionsFactory.CMR:
from cumulus_lambda_functions.stage_in_out.search_collections_cmr import SearchCollectionsCmr
return SearchCollectionsCmr()
if search_type == SearchCollectionsFactory.UNITY:
from cumulus_lambda_functions.stage_in_out.search_collections_unity import SearchCollectionsUnity
return SearchCollectionsUnity()
raise ValueError(f'unknown search_type: {search_type}')
48 changes: 48 additions & 0 deletions cumulus_lambda_functions/stage_in_out/search_collections_unity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import json
import logging
import os

from cumulus_lambda_functions.cumulus_dapa_client.dapa_client import DapaClient
from cumulus_lambda_functions.stage_in_out.search_collections_abstract import SearchCollectionsAbstract

LOGGER = logging.getLogger(__name__)


class SearchCollectionsUnity(SearchCollectionsAbstract):
COLLECTION_ID_KEY = 'COLLECTION_ID'
DOWNLOAD_DIR_KEY = 'DOWNLOAD_DIR'

LIMITS_KEY = 'LIMITS'
DATE_FROM_KEY = 'DATE_FROM'
DATE_TO_KEY = 'DATE_TO'
VERIFY_SSL_KEY = 'VERIFY_SSL'

def __init__(self) -> None:
super().__init__()
self.__collection_id = ''
self.__date_from = ''
self.__date_to = ''
self.__limit = 1000
self.__verify_ssl = True

def __set_props_from_env(self):
missing_keys = [k for k in [self.COLLECTION_ID_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')

self.__collection_id = os.environ.get(self.COLLECTION_ID_KEY)
if self.LIMITS_KEY not in os.environ:
LOGGER.warning(f'missing {self.LIMITS_KEY}. using default: {self.__limit}')
else:
self.__limit = int(os.environ.get(self.LIMITS_KEY))

self.__date_from = os.environ.get(self.DATE_FROM_KEY, '')
self.__date_to = os.environ.get(self.DATE_TO_KEY, '')
self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
return self

def search(self, **kwargs):
self.__set_props_from_env()
dapa_client = DapaClient().with_verify_ssl(self.__verify_ssl)
collections_result = dapa_client.get_collection(self.__collection_id)
return json.dumps(collections_result)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from abc import ABC, abstractmethod


class SearchGranulesAbstract(ABC):
@abstractmethod
def search(self, **kwargs) -> list:
raise NotImplementedError()
77 changes: 77 additions & 0 deletions cumulus_lambda_functions/stage_in_out/search_granules_cmr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import json
import logging
import os

import requests

from cumulus_lambda_functions.stage_in_out.search_granules_abstract import SearchGranulesAbstract

LOGGER = logging.getLogger(__name__)


class SearchGranulesCmr(SearchGranulesAbstract):
CMR_BASE_URL_KEY = 'CMR_BASE_URL'
COLLECTION_ID_KEY = 'COLLECTION_ID'
DOWNLOAD_DIR_KEY = 'DOWNLOAD_DIR'

LIMITS_KEY = 'LIMITS'
DATE_FROM_KEY = 'DATE_FROM'
DATE_TO_KEY = 'DATE_TO'
VERIFY_SSL_KEY = 'VERIFY_SSL'

def __init__(self) -> None:
super().__init__()
self.__collection_id = ''
self.__date_from = ''
self.__date_to = ''
self.__limit = 1000
self.__verify_ssl = True
self.__cmr_base_url = ''

def __set_props_from_env(self):
missing_keys = [k for k in [self.COLLECTION_ID_KEY, self.CMR_BASE_URL_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')

self.__collection_id = os.environ.get(self.COLLECTION_ID_KEY)
self.__cmr_base_url = os.environ.get(self.CMR_BASE_URL_KEY)
if not self.__cmr_base_url.endswith('/'):
self.__cmr_base_url = f'{self.__cmr_base_url}/'
if self.LIMITS_KEY not in os.environ:
LOGGER.warning(f'missing {self.LIMITS_KEY}. using default: {self.__limit}')
else:
self.__limit = int(os.environ.get(self.LIMITS_KEY))

self.__date_from = os.environ.get(self.DATE_FROM_KEY, '')
self.__date_to = os.environ.get(self.DATE_TO_KEY, '')
self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
return self

def search(self, **kwargs) -> str:
"""
curl 'https://cmr.earthdata.nasa.gov/search/granules.stac' \
-H 'accept: application/json; profile=stac-catalogue' \
-H 'content-type: application/x-www-form-urlencoded' \
--data-raw 'collection_concept_id=C1649553296-PODAAC&page_num=1&page_size=20&temporal[]=2011-08-01T00:00:00,2011-09-01T00:00:00'
:param kwargs:
:return:
"""
self.__set_props_from_env()
header = {
'accept': 'application/json; profile=stac-catalogue',
'Content-Type': 'application/x-www-form-urlencoded',
}
request_body = {
'collection_concept_id': self.__collection_id,
'page_num': '1',
'page_size': str(self.__limit),
'temporal[]': f'{self.__date_from},{self.__date_to}'
}
cmr_granules_url = f'{self.__cmr_base_url}search/granules.stac'
response = requests.post(url=cmr_granules_url, headers=header, verify=self.__verify_ssl,
data=request_body)
if response.status_code > 400:
raise RuntimeError(
f'Cognito ends in error. status_code: {response.status_code}. url: {cmr_granules_url}. details: {response.text}')
response = json.loads(response.content.decode('utf-8'))
return json.dumps(response['features'])
13 changes: 13 additions & 0 deletions cumulus_lambda_functions/stage_in_out/search_granules_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@


class SearchGranulesFactory:
CMR = 'CMR'
UNITY = 'UNITY'
def get_class(self, search_type):
if search_type == SearchGranulesFactory.CMR:
from cumulus_lambda_functions.stage_in_out.search_granules_cmr import SearchGranulesCmr
return SearchGranulesCmr()
if search_type == SearchGranulesFactory.UNITY:
from cumulus_lambda_functions.stage_in_out.search_granules_unity import SearchGranulesUnity
return SearchGranulesUnity()
raise ValueError(f'unknown search_type: {search_type}')
Loading