Skip to content

feat: add DAAC download logic #132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [3.4.0] - 2023-04-17
### Changed
- [#132](https://github.com/unity-sds/unity-data-services/pull/132) feat: add DAAC download logic

## [3.3.1] - 2023-04-13
### Changed
- [#136](https://github.com/unity-sds/unity-data-services/pull/136) fix: uncomment temporal in CMR granules search
Expand Down
3 changes: 2 additions & 1 deletion cumulus_lambda_functions/docker_entrypoint/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from sys import argv

from cumulus_lambda_functions.stage_in_out.catalog_granules_factory import CatalogGranulesFactory
from cumulus_lambda_functions.stage_in_out.download_granules_factory import DownloadGranulesFactory
from cumulus_lambda_functions.stage_in_out.download_granules_s3 import DownloadGranulesS3
from cumulus_lambda_functions.stage_in_out.search_granules_factory import SearchGranulesFactory
from cumulus_lambda_functions.stage_in_out.upoad_granules_factory import UploadGranulesFactory
Expand All @@ -14,7 +15,7 @@ def choose_process():
return SearchGranulesFactory().get_class(os.getenv('GRANULES_SEARCH_DOMAIN', 'MISSING_GRANULES_SEARCH_DOMAIN')).search()
if argv[1].strip().upper() == 'DOWNLOAD':
logging.info('starting DOWNLOAD script')
return DownloadGranulesS3().download()
return DownloadGranulesFactory().get_class(os.getenv('GRANULES_DOWNLOAD_TYPE', 'MISSING_GRANULES_DOWNLOAD_TYPE')).download()
if argv[1].strip().upper() == 'UPLOAD':
logging.info('starting UPLOAD script')
return UploadGranulesFactory().get_class(os.getenv('GRANULES_UPLOAD_TYPE', 'MISSING_GRANULES_UPLOAD_TYPE')).upload()
Expand Down
5 changes: 5 additions & 0 deletions cumulus_lambda_functions/lib/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ class Constants:
LOG_FORMAT = '%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s'
LOG_LEVEL = 'LOG_LEVEL'
UNITY_BEARER_TOKEN = 'UNITY_BEARER_TOKEN'
EDL_BEARER_TOKEN = 'EDL_BEARER_TOKEN'
CLIENT_ID = 'CLIENT_ID'
COGNITO_URL = 'COGNITO_URL'
EDL_BASE_URL = 'EDL_BASE_URL'
USERNAME = 'USERNAME'
PASSWORD = 'PASSWORD'
EDL_USERNAME = 'EDL_USERNAME'
EDL_PASSWORD = 'EDL_PASSWORD'
EDL_PASSWORD_TYPE = 'EDL_PASSWORD_TYPE'
VERIFY_SSL = 'VERIFY_SSL'
PASSWORD_TYPE = 'PASSWORD_TYPE'
PLAIN_STR = 'PLAIN'
Expand Down
Empty file.
116 changes: 116 additions & 0 deletions cumulus_lambda_functions/lib/earthdata_login/urs_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import json
import logging
from http.cookiejar import CookieJar
from typing import Dict
from urllib import request

import requests
from requests.auth import HTTPBasicAuth
from tenacity import retry, retry_if_result, stop_after_attempt, wait_random_exponential
LOGGER = logging.getLogger(__name__)


class URSToken(object):
def __init__(self, username: str, dwssap: str, edl_base_url: str = None) -> None:
super().__init__()
self.__default_edl_base_url = 'https://urs.earthdata.nasa.gov/'
self.__username = username
self.__dwssap = dwssap
self.__edl_base_url = self.__default_edl_base_url if edl_base_url is None or edl_base_url == '' else edl_base_url.strip().lower()
if not self.__edl_base_url.endswith('/'):
self.__edl_base_url = f'{self.__edl_base_url}/'
if not self.__edl_base_url.startswith('http'):
self.__edl_base_url = f'https://{self.__edl_base_url}'
self.__token = None

@retry(wait=wait_random_exponential(multiplier=1, max=60),
stop=stop_after_attempt(3),
reraise=True,
retry=(retry_if_result(lambda x: x == ''))
)
def create_token(self, url: str) -> str:
token: str = ''
try:
headers: Dict = {'Accept': 'application/json'} # noqa E501
resp = requests.post(url + "/token", headers=headers, auth=HTTPBasicAuth(self.__username, self.__dwssap))
if resp.status_code < 300:
raise ValueError(f'invalid response code: {resp.status_code}. details: {resp.content}')
response_content: Dict = json.loads(resp.content)
if "error" in response_content:
if response_content["error"] == "max_token_limit":
LOGGER.error("Max tokens acquired from URS. Using existing token")
tokens = self.list_tokens(url)
return tokens[0]
if 'access_token' not in response_content:
raise ValueError(f'access_token not found. {response_content}')
token = response_content['access_token']

# Add better error handling there
# Max tokens
# Wrong Username/Passsword
# Other
except: # noqa E722
LOGGER.warning("Error getting the token - check user name and password", exc_info=True)
return token

def list_tokens(self, url: str):
tokens = []
try:
headers: Dict = {'Accept': 'application/json'} # noqa E501
resp = requests.get(url + "/tokens", headers=headers, auth=HTTPBasicAuth(self.__username, self.__dwssap))
if resp.status_code >= 400:
LOGGER.error(f'error response: {resp.status_code}. details: {resp.content}')
return tokens
response_content = json.loads(resp.content)

for x in response_content:
tokens.append(x['access_token'])

except: # noqa E722
LOGGER.warning("Error getting the token - check user name and password", exc_info=True)
return tokens

def setup_earthdata_login_auth(self):
"""
Set up the request library so that it authenticates against the given
Earthdata Login endpoint and is able to track cookies between requests.
This looks in the .netrc file first and if no credentials are found,
it prompts for them.
Valid endpoints include:
urs.earthdata.nasa.gov - Earthdata Login production
"""
manager = request.HTTPPasswordMgrWithDefaultRealm()
manager.add_password(None, self.__edl_base_url, self.__username, self.__dwssap)
auth = request.HTTPBasicAuthHandler(manager)
__version__ = "1.12.0"

jar = CookieJar()
processor = request.HTTPCookieProcessor(jar)
opener = request.build_opener(auth, processor)
opener.addheaders = [('User-agent', 'unity-downloader-' + __version__)]
request.install_opener(opener)

def delete_token(self, token: str) -> bool:
try:
self.setup_earthdata_login_auth()
headers: Dict = {'Accept': 'application/json'}
resp = requests.post(f'{self.__edl_base_url}revoke_token', params={"token": token}, headers=headers,
auth=HTTPBasicAuth(self.__username, self.__dwssap))

if resp.status_code == 200:
LOGGER.info("EDL token successfully deleted")
return True
else:
LOGGER.info("EDL token deleting failed.")

except: # noqa E722
LOGGER.warning("Error deleting the token", exc_info=True)
return False

def get_token(self) -> str:
token_url = f'{self.__edl_base_url}api/users'
tokens = self.list_tokens(token_url)
if len(tokens) == 0:
return self.create_token(token_url)
else:
return tokens[0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import base64
import logging
import os

from cumulus_lambda_functions.lib.aws.aws_param_store import AwsParamStore
from cumulus_lambda_functions.lib.constants import Constants
from cumulus_lambda_functions.lib.earthdata_login.urs_token import URSToken

LOGGER = logging.getLogger(__name__)


class URSTokenRetriever:
def __get_username_dwssap(self):
username = os.environ[Constants.EDL_USERNAME]
dwssap = os.environ[Constants.EDL_PASSWORD]
if Constants.EDL_PASSWORD_TYPE not in os.environ:
LOGGER.debug('no PASSWORD_TYPE set in ENV. assuming PLAIN STR')
return username, dwssap
dwssap_type = os.environ[Constants.EDL_PASSWORD_TYPE]
if dwssap_type == Constants.PLAIN_STR:
LOGGER.debug('PLAIN_STR in ENV. returning PLAIN STR')
return username, dwssap
if dwssap_type == Constants.BASE64_STR:
LOGGER.debug('BASE64_STR in ENV. decoding & returning')
username = base64.standard_b64decode(username.encode('utf-8')).decode('utf-8')
dwssap = base64.standard_b64decode(dwssap.encode('utf-8')).decode('utf-8')
return username, dwssap
if dwssap_type == Constants.PARAM_STORE:
LOGGER.debug('PARAM_STORE in ENV. retrieving value from Param Store')
username_param_store = AwsParamStore().get_param(username)
dwssap_param_store = AwsParamStore().get_param(dwssap)
if username_param_store is None or dwssap_param_store is None:
raise ValueError(f'NULL username or password from Param Store. Set the value in {username} AND {dwssap}')
return username_param_store, dwssap_param_store
raise ValueError(f'invalid {Constants.PASSWORD_TYPE}. value: {dwssap_type}')

def start(self):
if Constants.EDL_BEARER_TOKEN in os.environ:
LOGGER.debug('found EDL_BEARER_TOKEN. returning UNITY_BEARER_TOKEN from ENV')
return os.environ[Constants.EDL_BEARER_TOKEN]
LOGGER.debug('EDL_BEARER_TOKEN not found. preparing to login')

missing_mandatory_env = [k for k in [Constants.EDL_USERNAME, Constants.EDL_PASSWORD] if k not in os.environ]
if len(missing_mandatory_env) > 0:
raise ValueError(f'missing mandatory ENV for login: {missing_mandatory_env}')
username, dwssap = self.__get_username_dwssap()
urs_token = URSToken(username, dwssap, os.environ.get(Constants.EDL_BASE_URL, ''))
token = urs_token.get_token()
return token
103 changes: 103 additions & 0 deletions cumulus_lambda_functions/stage_in_out/download_granules_daac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import requests

from cumulus_lambda_functions.lib.earthdata_login.urs_token_retriever import URSTokenRetriever
from cumulus_lambda_functions.stage_in_out.download_granules_abstract import DownloadGranulesAbstract
import json
import logging
import os

from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils

LOGGER = logging.getLogger(__name__)


class DownloadGranulesDAAC(DownloadGranulesAbstract):
DOWNLOAD_DIR_KEY = 'DOWNLOAD_DIR'
STAC_JSON = 'STAC_JSON'

def __init__(self) -> None:
super().__init__()
self.__download_dir = '/tmp'
self.__s3 = AwsS3()
self.__granules_json = []
self.__edl_token = None

def __set_props_from_env(self):
missing_keys = [k for k in [self.STAC_JSON, self.DOWNLOAD_DIR_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')
self.__granules_json = json.loads(os.environ.get(self.STAC_JSON))
self.__download_dir = os.environ.get(self.DOWNLOAD_DIR_KEY)
self.__download_dir = self.__download_dir[:-1] if self.__download_dir.endswith('/') else self.__download_dir
self.__edl_token = URSTokenRetriever().start()
return self

def __get_downloading_urls(self, granules_result: list):
if len(granules_result) < 1:
LOGGER.warning(f'cannot find any granules')
return []
downloading_urls = [k['assets'] for k in granules_result]
return downloading_urls

def __download_one_granule(self, assets: dict):
"""
sample assets
{
"data": {
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853900.PDS",
"title": "P1570515ATMSSCIENCEAAT16017044853900.PDS",
"description": "P1570515ATMSSCIENCEAAT16017044853900.PDS"
},
"metadata__data": {
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853901.PDS",
"title": "P1570515ATMSSCIENCEAAT16017044853901.PDS",
"description": "P1570515ATMSSCIENCEAAT16017044853901.PDS"
},
"metadata__xml": {
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853901.PDS.xml",
"title": "P1570515ATMSSCIENCEAAT16017044853901.PDS.xml",
"description": "P1570515ATMSSCIENCEAAT16017044853901.PDS.xml"
},
"metadata__cmr": {
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853900.PDS.cmr.xml",
"title": "P1570515ATMSSCIENCEAAT16017044853900.PDS.cmr.xml",
"description": "P1570515ATMSSCIENCEAAT16017044853900.PDS.cmr.xml"
}
}
:param assets:
:return:
"""
error_log = []
headers = {
'Authorization': f'Bearer {self.__edl_token}'
}
for k, v in assets.items():
try:
LOGGER.debug(f'downloading: {v["href"]}')
r = requests.get(v['href'], headers=headers)
if r.status_code >= 400:
raise RuntimeError(f'wrong response status: {r.status_code}. details: {r.content}')
# TODO. how to correctly check redirecting to login page
with open(os.path.join(self.__download_dir, os.path.basename(v["href"])), 'wb') as fd:
fd.write(r.content)
except Exception as e:
LOGGER.exception(f'failed to download {v}')
v['cause'] = str(e)
error_log.append(v)
return error_log

def download(self, **kwargs) -> list:
self.__set_props_from_env()
LOGGER.debug(f'creating download dir: {self.__download_dir}')
FileUtils.mk_dir_p(self.__download_dir)
downloading_urls = self.__get_downloading_urls(self.__granules_json)
error_list = []
for each in downloading_urls:
LOGGER.debug(f'working on {each}')
current_error_list = self.__download_one_granule(each)
error_list.extend(current_error_list)
if len(error_list) > 0:
with open(f'{self.__download_dir}/error.log', 'w') as error_file:
error_file.write(json.dumps(error_list, indent=4))
return downloading_urls
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@


class DownloadGranulesFactory:
S3 = 'S3'
DAAC = 'DAAC'

def get_class(self, search_type):
if search_type == DownloadGranulesFactory.S3:
from cumulus_lambda_functions.stage_in_out.download_granules_s3 import DownloadGranulesS3
return DownloadGranulesS3()
elif search_type == DownloadGranulesFactory.DAAC:
from cumulus_lambda_functions.stage_in_out.download_granules_daac import DownloadGranulesDAAC
return DownloadGranulesDAAC()
raise ValueError(f'unknown search_type: {search_type}')
5 changes: 5 additions & 0 deletions docker/stage-in-stage-out/dc-002-download.yaml

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
'pystac', 'jsonschema',
'fastjsonschema',
'xmltodict',
'tenacity',
'requests'
]

Expand All @@ -17,7 +18,7 @@

setup(
name="cumulus_lambda_functions",
version="3.3.1",
version="3.4.0",
packages=find_packages(),
install_requires=install_requires,
tests_require=['mock', 'nose', 'sphinx', 'sphinx_rtd_theme', 'coverage', 'pystac', 'python-dotenv', 'jsonschema'],
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os
from unittest import TestCase

from cumulus_lambda_functions.lib.constants import Constants
from cumulus_lambda_functions.lib.earthdata_login.urs_token_retriever import URSTokenRetriever


class TestURSTokenRetriever(TestCase):
def test_01(self):
os.environ[Constants.EDL_USERNAME] = '/unity/uds/user/wphyo/edl_username'
os.environ[Constants.EDL_PASSWORD] = '/unity/uds/user/wphyo/edl_dwssap'
# os.environ[Constants.USERNAME] = 'usps_username'
# os.environ[Constants.PASSWORD] = 'usps_password'
os.environ[Constants.EDL_PASSWORD_TYPE] = Constants.PARAM_STORE
os.environ[Constants.EDL_BASE_URL] = 'urs.earthdata.nasa.gov'
result = URSTokenRetriever().start()
self.assertTrue(len(result) > 0, 'empty token')
return
42 changes: 42 additions & 0 deletions tests/integration_tests/test_docker_entry.py

Large diffs are not rendered by default.