Skip to content

Commit 2e2fee1

Browse files
authored
Merge pull request #132 from unity-sds/daac-download
feat: add DAAC download logic
2 parents 3375bce + a0ae495 commit 2e2fee1

File tree

13 files changed

+352
-2
lines changed

13 files changed

+352
-2
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [3.4.0] - 2023-04-17
9+
### Changed
10+
- [#132](https://github.com/unity-sds/unity-data-services/pull/132) feat: add DAAC download logic
11+
812
## [3.3.1] - 2023-04-13
913
### Changed
1014
- [#136](https://github.com/unity-sds/unity-data-services/pull/136) fix: uncomment temporal in CMR granules search

cumulus_lambda_functions/docker_entrypoint/__main__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from sys import argv
44

55
from cumulus_lambda_functions.stage_in_out.catalog_granules_factory import CatalogGranulesFactory
6+
from cumulus_lambda_functions.stage_in_out.download_granules_factory import DownloadGranulesFactory
67
from cumulus_lambda_functions.stage_in_out.download_granules_s3 import DownloadGranulesS3
78
from cumulus_lambda_functions.stage_in_out.search_granules_factory import SearchGranulesFactory
89
from cumulus_lambda_functions.stage_in_out.upoad_granules_factory import UploadGranulesFactory
@@ -14,7 +15,7 @@ def choose_process():
1415
return SearchGranulesFactory().get_class(os.getenv('GRANULES_SEARCH_DOMAIN', 'MISSING_GRANULES_SEARCH_DOMAIN')).search()
1516
if argv[1].strip().upper() == 'DOWNLOAD':
1617
logging.info('starting DOWNLOAD script')
17-
return DownloadGranulesS3().download()
18+
return DownloadGranulesFactory().get_class(os.getenv('GRANULES_DOWNLOAD_TYPE', 'MISSING_GRANULES_DOWNLOAD_TYPE')).download()
1819
if argv[1].strip().upper() == 'UPLOAD':
1920
logging.info('starting UPLOAD script')
2021
return UploadGranulesFactory().get_class(os.getenv('GRANULES_UPLOAD_TYPE', 'MISSING_GRANULES_UPLOAD_TYPE')).upload()

cumulus_lambda_functions/lib/constants.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,15 @@ class Constants:
22
LOG_FORMAT = '%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s'
33
LOG_LEVEL = 'LOG_LEVEL'
44
UNITY_BEARER_TOKEN = 'UNITY_BEARER_TOKEN'
5+
EDL_BEARER_TOKEN = 'EDL_BEARER_TOKEN'
56
CLIENT_ID = 'CLIENT_ID'
67
COGNITO_URL = 'COGNITO_URL'
8+
EDL_BASE_URL = 'EDL_BASE_URL'
79
USERNAME = 'USERNAME'
810
PASSWORD = 'PASSWORD'
11+
EDL_USERNAME = 'EDL_USERNAME'
12+
EDL_PASSWORD = 'EDL_PASSWORD'
13+
EDL_PASSWORD_TYPE = 'EDL_PASSWORD_TYPE'
914
VERIFY_SSL = 'VERIFY_SSL'
1015
PASSWORD_TYPE = 'PASSWORD_TYPE'
1116
PLAIN_STR = 'PLAIN'

cumulus_lambda_functions/lib/earthdata_login/__init__.py

Whitespace-only changes.
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import json
2+
import logging
3+
from http.cookiejar import CookieJar
4+
from typing import Dict
5+
from urllib import request
6+
7+
import requests
8+
from requests.auth import HTTPBasicAuth
9+
from tenacity import retry, retry_if_result, stop_after_attempt, wait_random_exponential
10+
LOGGER = logging.getLogger(__name__)
11+
12+
13+
class URSToken(object):
14+
def __init__(self, username: str, dwssap: str, edl_base_url: str = None) -> None:
15+
super().__init__()
16+
self.__default_edl_base_url = 'https://urs.earthdata.nasa.gov/'
17+
self.__username = username
18+
self.__dwssap = dwssap
19+
self.__edl_base_url = self.__default_edl_base_url if edl_base_url is None or edl_base_url == '' else edl_base_url.strip().lower()
20+
if not self.__edl_base_url.endswith('/'):
21+
self.__edl_base_url = f'{self.__edl_base_url}/'
22+
if not self.__edl_base_url.startswith('http'):
23+
self.__edl_base_url = f'https://{self.__edl_base_url}'
24+
self.__token = None
25+
26+
@retry(wait=wait_random_exponential(multiplier=1, max=60),
27+
stop=stop_after_attempt(3),
28+
reraise=True,
29+
retry=(retry_if_result(lambda x: x == ''))
30+
)
31+
def create_token(self, url: str) -> str:
32+
token: str = ''
33+
try:
34+
headers: Dict = {'Accept': 'application/json'} # noqa E501
35+
resp = requests.post(url + "/token", headers=headers, auth=HTTPBasicAuth(self.__username, self.__dwssap))
36+
if resp.status_code < 300:
37+
raise ValueError(f'invalid response code: {resp.status_code}. details: {resp.content}')
38+
response_content: Dict = json.loads(resp.content)
39+
if "error" in response_content:
40+
if response_content["error"] == "max_token_limit":
41+
LOGGER.error("Max tokens acquired from URS. Using existing token")
42+
tokens = self.list_tokens(url)
43+
return tokens[0]
44+
if 'access_token' not in response_content:
45+
raise ValueError(f'access_token not found. {response_content}')
46+
token = response_content['access_token']
47+
48+
# Add better error handling there
49+
# Max tokens
50+
# Wrong Username/Passsword
51+
# Other
52+
except: # noqa E722
53+
LOGGER.warning("Error getting the token - check user name and password", exc_info=True)
54+
return token
55+
56+
def list_tokens(self, url: str):
57+
tokens = []
58+
try:
59+
headers: Dict = {'Accept': 'application/json'} # noqa E501
60+
resp = requests.get(url + "/tokens", headers=headers, auth=HTTPBasicAuth(self.__username, self.__dwssap))
61+
if resp.status_code >= 400:
62+
LOGGER.error(f'error response: {resp.status_code}. details: {resp.content}')
63+
return tokens
64+
response_content = json.loads(resp.content)
65+
66+
for x in response_content:
67+
tokens.append(x['access_token'])
68+
69+
except: # noqa E722
70+
LOGGER.warning("Error getting the token - check user name and password", exc_info=True)
71+
return tokens
72+
73+
def setup_earthdata_login_auth(self):
74+
"""
75+
Set up the request library so that it authenticates against the given
76+
Earthdata Login endpoint and is able to track cookies between requests.
77+
This looks in the .netrc file first and if no credentials are found,
78+
it prompts for them.
79+
Valid endpoints include:
80+
urs.earthdata.nasa.gov - Earthdata Login production
81+
"""
82+
manager = request.HTTPPasswordMgrWithDefaultRealm()
83+
manager.add_password(None, self.__edl_base_url, self.__username, self.__dwssap)
84+
auth = request.HTTPBasicAuthHandler(manager)
85+
__version__ = "1.12.0"
86+
87+
jar = CookieJar()
88+
processor = request.HTTPCookieProcessor(jar)
89+
opener = request.build_opener(auth, processor)
90+
opener.addheaders = [('User-agent', 'unity-downloader-' + __version__)]
91+
request.install_opener(opener)
92+
93+
def delete_token(self, token: str) -> bool:
94+
try:
95+
self.setup_earthdata_login_auth()
96+
headers: Dict = {'Accept': 'application/json'}
97+
resp = requests.post(f'{self.__edl_base_url}revoke_token', params={"token": token}, headers=headers,
98+
auth=HTTPBasicAuth(self.__username, self.__dwssap))
99+
100+
if resp.status_code == 200:
101+
LOGGER.info("EDL token successfully deleted")
102+
return True
103+
else:
104+
LOGGER.info("EDL token deleting failed.")
105+
106+
except: # noqa E722
107+
LOGGER.warning("Error deleting the token", exc_info=True)
108+
return False
109+
110+
def get_token(self) -> str:
111+
token_url = f'{self.__edl_base_url}api/users'
112+
tokens = self.list_tokens(token_url)
113+
if len(tokens) == 0:
114+
return self.create_token(token_url)
115+
else:
116+
return tokens[0]
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import base64
2+
import logging
3+
import os
4+
5+
from cumulus_lambda_functions.lib.aws.aws_param_store import AwsParamStore
6+
from cumulus_lambda_functions.lib.constants import Constants
7+
from cumulus_lambda_functions.lib.earthdata_login.urs_token import URSToken
8+
9+
LOGGER = logging.getLogger(__name__)
10+
11+
12+
class URSTokenRetriever:
13+
def __get_username_dwssap(self):
14+
username = os.environ[Constants.EDL_USERNAME]
15+
dwssap = os.environ[Constants.EDL_PASSWORD]
16+
if Constants.EDL_PASSWORD_TYPE not in os.environ:
17+
LOGGER.debug('no PASSWORD_TYPE set in ENV. assuming PLAIN STR')
18+
return username, dwssap
19+
dwssap_type = os.environ[Constants.EDL_PASSWORD_TYPE]
20+
if dwssap_type == Constants.PLAIN_STR:
21+
LOGGER.debug('PLAIN_STR in ENV. returning PLAIN STR')
22+
return username, dwssap
23+
if dwssap_type == Constants.BASE64_STR:
24+
LOGGER.debug('BASE64_STR in ENV. decoding & returning')
25+
username = base64.standard_b64decode(username.encode('utf-8')).decode('utf-8')
26+
dwssap = base64.standard_b64decode(dwssap.encode('utf-8')).decode('utf-8')
27+
return username, dwssap
28+
if dwssap_type == Constants.PARAM_STORE:
29+
LOGGER.debug('PARAM_STORE in ENV. retrieving value from Param Store')
30+
username_param_store = AwsParamStore().get_param(username)
31+
dwssap_param_store = AwsParamStore().get_param(dwssap)
32+
if username_param_store is None or dwssap_param_store is None:
33+
raise ValueError(f'NULL username or password from Param Store. Set the value in {username} AND {dwssap}')
34+
return username_param_store, dwssap_param_store
35+
raise ValueError(f'invalid {Constants.PASSWORD_TYPE}. value: {dwssap_type}')
36+
37+
def start(self):
38+
if Constants.EDL_BEARER_TOKEN in os.environ:
39+
LOGGER.debug('found EDL_BEARER_TOKEN. returning UNITY_BEARER_TOKEN from ENV')
40+
return os.environ[Constants.EDL_BEARER_TOKEN]
41+
LOGGER.debug('EDL_BEARER_TOKEN not found. preparing to login')
42+
43+
missing_mandatory_env = [k for k in [Constants.EDL_USERNAME, Constants.EDL_PASSWORD] if k not in os.environ]
44+
if len(missing_mandatory_env) > 0:
45+
raise ValueError(f'missing mandatory ENV for login: {missing_mandatory_env}')
46+
username, dwssap = self.__get_username_dwssap()
47+
urs_token = URSToken(username, dwssap, os.environ.get(Constants.EDL_BASE_URL, ''))
48+
token = urs_token.get_token()
49+
return token
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import requests
2+
3+
from cumulus_lambda_functions.lib.earthdata_login.urs_token_retriever import URSTokenRetriever
4+
from cumulus_lambda_functions.stage_in_out.download_granules_abstract import DownloadGranulesAbstract
5+
import json
6+
import logging
7+
import os
8+
9+
from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3
10+
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils
11+
12+
LOGGER = logging.getLogger(__name__)
13+
14+
15+
class DownloadGranulesDAAC(DownloadGranulesAbstract):
16+
DOWNLOAD_DIR_KEY = 'DOWNLOAD_DIR'
17+
STAC_JSON = 'STAC_JSON'
18+
19+
def __init__(self) -> None:
20+
super().__init__()
21+
self.__download_dir = '/tmp'
22+
self.__s3 = AwsS3()
23+
self.__granules_json = []
24+
self.__edl_token = None
25+
26+
def __set_props_from_env(self):
27+
missing_keys = [k for k in [self.STAC_JSON, self.DOWNLOAD_DIR_KEY] if k not in os.environ]
28+
if len(missing_keys) > 0:
29+
raise ValueError(f'missing environment keys: {missing_keys}')
30+
self.__granules_json = json.loads(os.environ.get(self.STAC_JSON))
31+
self.__download_dir = os.environ.get(self.DOWNLOAD_DIR_KEY)
32+
self.__download_dir = self.__download_dir[:-1] if self.__download_dir.endswith('/') else self.__download_dir
33+
self.__edl_token = URSTokenRetriever().start()
34+
return self
35+
36+
def __get_downloading_urls(self, granules_result: list):
37+
if len(granules_result) < 1:
38+
LOGGER.warning(f'cannot find any granules')
39+
return []
40+
downloading_urls = [k['assets'] for k in granules_result]
41+
return downloading_urls
42+
43+
def __download_one_granule(self, assets: dict):
44+
"""
45+
sample assets
46+
{
47+
"data": {
48+
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853900.PDS",
49+
"title": "P1570515ATMSSCIENCEAAT16017044853900.PDS",
50+
"description": "P1570515ATMSSCIENCEAAT16017044853900.PDS"
51+
},
52+
"metadata__data": {
53+
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853901.PDS",
54+
"title": "P1570515ATMSSCIENCEAAT16017044853901.PDS",
55+
"description": "P1570515ATMSSCIENCEAAT16017044853901.PDS"
56+
},
57+
"metadata__xml": {
58+
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853901.PDS.xml",
59+
"title": "P1570515ATMSSCIENCEAAT16017044853901.PDS.xml",
60+
"description": "P1570515ATMSSCIENCEAAT16017044853901.PDS.xml"
61+
},
62+
"metadata__cmr": {
63+
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853900.PDS.cmr.xml",
64+
"title": "P1570515ATMSSCIENCEAAT16017044853900.PDS.cmr.xml",
65+
"description": "P1570515ATMSSCIENCEAAT16017044853900.PDS.cmr.xml"
66+
}
67+
}
68+
:param assets:
69+
:return:
70+
"""
71+
error_log = []
72+
headers = {
73+
'Authorization': f'Bearer {self.__edl_token}'
74+
}
75+
for k, v in assets.items():
76+
try:
77+
LOGGER.debug(f'downloading: {v["href"]}')
78+
r = requests.get(v['href'], headers=headers)
79+
if r.status_code >= 400:
80+
raise RuntimeError(f'wrong response status: {r.status_code}. details: {r.content}')
81+
# TODO. how to correctly check redirecting to login page
82+
with open(os.path.join(self.__download_dir, os.path.basename(v["href"])), 'wb') as fd:
83+
fd.write(r.content)
84+
except Exception as e:
85+
LOGGER.exception(f'failed to download {v}')
86+
v['cause'] = str(e)
87+
error_log.append(v)
88+
return error_log
89+
90+
def download(self, **kwargs) -> list:
91+
self.__set_props_from_env()
92+
LOGGER.debug(f'creating download dir: {self.__download_dir}')
93+
FileUtils.mk_dir_p(self.__download_dir)
94+
downloading_urls = self.__get_downloading_urls(self.__granules_json)
95+
error_list = []
96+
for each in downloading_urls:
97+
LOGGER.debug(f'working on {each}')
98+
current_error_list = self.__download_one_granule(each)
99+
error_list.extend(current_error_list)
100+
if len(error_list) > 0:
101+
with open(f'{self.__download_dir}/error.log', 'w') as error_file:
102+
error_file.write(json.dumps(error_list, indent=4))
103+
return downloading_urls
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
1+
2+
13
class DownloadGranulesFactory:
24
S3 = 'S3'
5+
DAAC = 'DAAC'
36

47
def get_class(self, search_type):
58
if search_type == DownloadGranulesFactory.S3:
69
from cumulus_lambda_functions.stage_in_out.download_granules_s3 import DownloadGranulesS3
710
return DownloadGranulesS3()
11+
elif search_type == DownloadGranulesFactory.DAAC:
12+
from cumulus_lambda_functions.stage_in_out.download_granules_daac import DownloadGranulesDAAC
13+
return DownloadGranulesDAAC()
814
raise ValueError(f'unknown search_type: {search_type}')

docker/stage-in-stage-out/dc-002-download.yaml

Lines changed: 5 additions & 0 deletions
Large diffs are not rendered by default.

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
'pystac', 'jsonschema',
55
'fastjsonschema',
66
'xmltodict',
7+
'tenacity',
78
'requests'
89
]
910

@@ -17,7 +18,7 @@
1718

1819
setup(
1920
name="cumulus_lambda_functions",
20-
version="3.3.1",
21+
version="3.4.0",
2122
packages=find_packages(),
2223
install_requires=install_requires,
2324
tests_require=['mock', 'nose', 'sphinx', 'sphinx_rtd_theme', 'coverage', 'pystac', 'python-dotenv', 'jsonschema'],

0 commit comments

Comments
 (0)