Skip to content

Commit 4ee05e1

Browse files
ngachungwphyojpl
andauthored
release/1.5.14 (#56)
* feat: docker script to upload granules via cnm + cnm ingest lambda bugfix * feat: update terraform to include a new lambda * breaking: update docker entry script * fix: missing path in upload script * fix: checking if querystring is NULL * fix: added lambda_processing_role_arn as a variable to resolve terraform dependency issues * fix: version bump Co-authored-by: Wai Phyo <wai.phyo@jpl.nasa.gov>
1 parent 36b0b06 commit 4ee05e1

File tree

19 files changed

+328
-19
lines changed

19 files changed

+328
-19
lines changed

cumulus_lambda_functions/cumulus_collections_dapa/cumulus_collections_dapa.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def __init__(self, event):
2828
self.__cumulus.with_page_number(self.__page_number)
2929

3030
def __assign_values(self):
31-
if 'queryStringParameters' not in self.__event:
31+
if 'queryStringParameters' not in self.__event or self.__event['queryStringParameters'] is None:
3232
return self
3333
query_str_dict = self.__event['queryStringParameters']
3434
if 'limit' in query_str_dict:

cumulus_lambda_functions/cumulus_granules_dapa/cumulus_granules_dapa.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def __assign_values(self):
7474
# if self.__jwt_token[:6].lower() == 'bearer':
7575
# self.__jwt_token = self.__jwt_token[6:].strip()
7676
self.__jwt_token = 'NA'
77-
if 'queryStringParameters' not in self.__event:
77+
if 'queryStringParameters' not in self.__event or self.__event['queryStringParameters'] is None:
7878
return self
7979
query_str_dict = self.__event['queryStringParameters']
8080
if 'datetime' in query_str_dict:

cumulus_lambda_functions/cumulus_granules_dapa_ingest_cnm/cumulus_granules_dapa_ingest_cnm.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,12 @@ def __init__(self, event):
5353
self.__sns_topic_arn = os.getenv('SNS_TOPIC_ARN')
5454

5555
def __get_json_request_body(self):
56-
if 'requestContext' not in self.__event:
57-
raise ValueError(f'missing requestContext in {self.__event}')
58-
self.__request_body = self.__event['requestContext']
56+
if 'body' not in self.__event:
57+
raise ValueError(f'missing body in {self.__event}')
58+
self.__request_body = json.loads(self.__event['body'])
5959
validation_result = JsonValidator(REQUEST_BODY_SCHEMA).validate(self.__request_body)
6060
if validation_result is not None:
61+
LOGGER.debug(f'invalid request body: {self.__request_body}')
6162
raise ValueError(f'invalid cumulus granule json: {validation_result}')
6263
return self
6364

cumulus_lambda_functions/cumulus_upload_granules/__init__.py

Whitespace-only changes.
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
import json
2+
import logging
3+
import os
4+
import re
5+
from collections import defaultdict
6+
from glob import glob
7+
8+
import requests
9+
10+
from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3
11+
12+
LOGGER = logging.getLogger(__name__)
13+
14+
15+
class UploadGranules:
16+
DAPA_API_KEY = 'DAPA_API'
17+
UNITY_BEARER_TOKEN_KEY = 'UNITY_BEARER_TOKEN'
18+
COLLECTION_ID_KEY = 'COLLECTION_ID'
19+
PROVIDER_ID_KEY = 'PROVIDER_ID'
20+
UPLOAD_DIR_KEY = 'UPLOAD_DIR'
21+
STAGING_BUCKET_KEY = 'STAGING_BUCKET'
22+
23+
VERIFY_SSL_KEY = 'VERIFY_SSL'
24+
DELETE_FILES_KEY = 'DELETE_FILES'
25+
26+
def __init__(self):
27+
self.__dapa_api = ''
28+
self.__unity_bearer_token = ''
29+
self.__collection_id = ''
30+
self.__collection_details = {}
31+
self.__uploading_granules = []
32+
self.__provider_id = ''
33+
self.__staging_bucket = ''
34+
self.__upload_dir = '/tmp'
35+
self.__verify_ssl = True
36+
self.__delete_files = False
37+
self.__s3 = AwsS3()
38+
self.__raw_files = []
39+
40+
def __set_props_from_env(self):
41+
missing_keys = [k for k in [self.DAPA_API_KEY, self.COLLECTION_ID_KEY, self.PROVIDER_ID_KEY, self.UPLOAD_DIR_KEY, self.UNITY_BEARER_TOKEN_KEY, self.STAGING_BUCKET_KEY] if k not in os.environ]
42+
if len(missing_keys) > 0:
43+
raise ValueError(f'missing environment keys: {missing_keys}')
44+
45+
self.__dapa_api = os.environ.get(self.DAPA_API_KEY)
46+
self.__dapa_api = self.__dapa_api[:-1] if self.__dapa_api.endswith('/') else self.__dapa_api
47+
self.__unity_bearer_token = os.environ.get(self.UNITY_BEARER_TOKEN_KEY)
48+
self.__collection_id = os.environ.get(self.COLLECTION_ID_KEY)
49+
self.__provider_id = os.environ.get(self.PROVIDER_ID_KEY)
50+
self.__staging_bucket = os.environ.get(self.STAGING_BUCKET_KEY)
51+
52+
self.__upload_dir = os.environ.get(self.UPLOAD_DIR_KEY)
53+
self.__upload_dir = self.__upload_dir[:-1] if self.__upload_dir.endswith('/') else self.__upload_dir
54+
55+
self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
56+
self.__delete_files = os.environ.get(self.DELETE_FILES_KEY, 'FALSE').strip().upper() == 'TRUE'
57+
return self
58+
59+
def __get_collection_stac(self):
60+
LOGGER.debug(f'getting collection details for: {self.__collection_id}')
61+
header = {'Authorization': f'Bearer {self.__unity_bearer_token}'}
62+
dapa_collection_url = f'{self.__dapa_api}/am-uds-dapa/collections?limit=1000'
63+
# TODO need better endpoint to get exactly 1 collection
64+
# TODO pagination?
65+
response = requests.get(url=dapa_collection_url, headers=header, verify=self.__verify_ssl)
66+
if response.status_code > 400:
67+
raise RuntimeError(f'querying collections ends in error. status_code: {response.status_code}. url: {dapa_collection_url}. details: {response.text}')
68+
collections_result = json.loads(response.text)
69+
if 'features' not in collections_result:
70+
raise RuntimeError(f'missing features in response. invalid response: response: {collections_result}')
71+
print(self.__collection_id)
72+
collection_details = [each_collection for each_collection in collections_result['features'] if self.__collection_id == each_collection["id"]]
73+
if len(collection_details) < 1:
74+
raise RuntimeError(f'unable to find collection in DAPA')
75+
self.__collection_details = collection_details[0]
76+
return self
77+
78+
def __sort_granules(self):
79+
file_regex_list = {k['type']: k['href'].split('___')[-1] for k in self.__collection_details['links'] if not k['title'].endswith('cmr.xml')}
80+
granule_id_extraction = self.__collection_details['summaries']['granuleIdExtraction']
81+
granules = defaultdict(dict)
82+
for each_file in self.__raw_files:
83+
each_filename = os.path.basename(each_file)
84+
each_granule_id = re.findall(granule_id_extraction, each_filename)
85+
if len(each_granule_id) < 1:
86+
LOGGER.warning(f'skipping file that cannot be matched to granule_id: {each_file}')
87+
continue
88+
each_granule_id = each_granule_id[0]
89+
if isinstance(each_granule_id, tuple):
90+
each_granule_id = each_granule_id[0]
91+
data_type = [k for k, v in file_regex_list.items() if len(re.findall(v, each_filename)) > 0]
92+
if len(data_type) != 1:
93+
LOGGER.warning(f'skipping file that cannot be matched to a datatype: {each_file}.. data_type: {data_type}')
94+
continue
95+
data_type = data_type[0]
96+
if data_type in granules[each_granule_id]:
97+
LOGGER.warning(f'duplicated data type: {data_type}. file: {each_file}. existing data_type: {granules[each_granule_id][data_type]}')
98+
continue
99+
granules[each_granule_id][data_type] = {
100+
'href': each_file
101+
}
102+
LOGGER.debug(f'filtering granules w/o data. original len: {len(granules)} original granules: {granules}')
103+
granules = {k: v for k, v in granules.items() if 'data' in v}
104+
LOGGER.debug(f'filtered granules. original len: {len(granules)}. granules: {granules}')
105+
return granules
106+
107+
def __upload_granules(self, granule_assets: dict, granule_id: str):
108+
for data_type, href_dict in granule_assets.items():
109+
LOGGER.debug(f'uploading {href_dict}')
110+
s3_url = self.__s3.upload(href_dict['href'], self.__staging_bucket, granule_id, self.__delete_files)
111+
href_dict['href'] = s3_url
112+
return self
113+
114+
def __execute_dapa_cnm_ingestion(self, cnm_ingest_body: dict):
115+
dapa_ingest_cnm_api = f'{self.__dapa_api}/am-uds-dapa/collections/'
116+
LOGGER.debug(f'getting granules for: {dapa_ingest_cnm_api}')
117+
header = {
118+
'Authorization': f'Bearer {self.__unity_bearer_token}',
119+
'Content-Type': 'application/json',
120+
}
121+
response = requests.put(url=dapa_ingest_cnm_api, headers=header, verify=self.__verify_ssl, data=json.dumps(cnm_ingest_body))
122+
if response.status_code > 400:
123+
raise RuntimeError(
124+
f'querying granules ingestion ends in error. status_code: {response.status_code}. url: {dapa_ingest_cnm_api}. details: {response.text}')
125+
granules_result = response.text
126+
return granules_result
127+
128+
def start(self):
129+
"""
130+
131+
1. recursively get all files from upload dir
132+
2. use collection id to get the links
133+
3. group files from step-1 into granules
134+
4. get granule ID ???
135+
5. upload to staging bucket with granuleID as key
136+
6. call DAPA endpoint to start the registration to cumulus
137+
:return:
138+
"""
139+
self.__set_props_from_env()
140+
LOGGER.debug(f'listing files recursively in dir: {self.__upload_dir}')
141+
self.__raw_files = glob(f'{self.__upload_dir}/**/*', recursive=True)
142+
self.__get_collection_stac()
143+
on_disk_granules = self.__sort_granules()
144+
LOGGER.debug(f'on_disk_granules: {on_disk_granules}')
145+
dapa_body_granules = []
146+
for granule_id, granule_hrefs in on_disk_granules.items():
147+
self.__upload_granules(granule_hrefs, granule_id)
148+
dapa_body_granules.append({
149+
'id': granule_id,
150+
'collection': self.__collection_id,
151+
'assets': granule_hrefs,
152+
})
153+
LOGGER.debug(f'dapa_body_granules: {dapa_body_granules}')
154+
dapa_body = {
155+
"provider_id": self.__provider_id,
156+
"features": dapa_body_granules
157+
}
158+
LOGGER.debug(f'dapa_body_granules: {dapa_body}')
159+
dapa_ingest_reuslt = self.__execute_dapa_cnm_ingestion(dapa_body)
160+
return dapa_ingest_reuslt

cumulus_lambda_functions/docker_entrypoint/__init__.py

Whitespace-only changes.
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import logging
2+
import os
3+
from sys import argv
4+
5+
from cumulus_lambda_functions.cumulus_download_granules.download_granules import DownloadGranules
6+
from cumulus_lambda_functions.cumulus_upload_granules.upload_granules import UploadGranules
7+
8+
9+
def choose_process():
10+
if argv[1].strip().upper() == 'DOWNLOAD':
11+
logging.info('starting DOWNLOAD script')
12+
DownloadGranules().start()
13+
elif argv[1].strip().upper() == 'UPLOAD':
14+
logging.info('starting UPLOAD script')
15+
logging.info(UploadGranules().start())
16+
else:
17+
raise ValueError(f'invalid argument: {argv}')
18+
return
19+
20+
21+
if __name__ == '__main__':
22+
logging.basicConfig(level=int(os.environ.get('LOG_LEVEL', '10')),
23+
format="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s")
24+
choose_process()

cumulus_lambda_functions/lib/aws/aws_s3.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import logging
22
import os
33
from io import BytesIO
4+
from typing import Union
5+
46
from cumulus_lambda_functions.lib.aws.aws_cred import AwsCred
57
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils
68

@@ -16,6 +18,53 @@ def __init__(self):
1618
self.__target_bucket = None
1719
self.__target_key = None
1820

21+
def __upload_to_s3(self, bucket, prefix, file_path, delete_files=False, add_size=True, other_tags={}, s3_name=None):
22+
"""
23+
Uploading a file to S3
24+
:param bucket: string - name of bucket
25+
:param prefix: string - prefix. don't start and end with `/` to avoid extra unnamed dirs
26+
:param file_path: string - absolute path of file location
27+
:param delete_files: boolean - deleting original file. default: False
28+
:param add_size: boolean - adding the file size as tag. default: True
29+
:param other_tags: dict - key-value pairs as a dictionary
30+
:param s3_name: string - name of s3 file if the user wishes to change.
31+
using the actual filename if not provided. defaulted to None
32+
:return: None
33+
"""
34+
tags = {
35+
'TagSet': []
36+
}
37+
if add_size is True:
38+
tags['TagSet'].append({
39+
'Key': 'org_size',
40+
'Value': str(FileUtils.get_size(file_path))
41+
})
42+
for key, val in other_tags.items():
43+
tags['TagSet'].append({
44+
'Key': key,
45+
'Value': str(val)
46+
})
47+
if s3_name is None:
48+
s3_name = os.path.basename(file_path)
49+
s3_key = '{}/{}'.format(prefix, s3_name)
50+
self.__s3_client.upload_file(file_path, bucket, s3_key, ExtraArgs={'ServerSideEncryption': 'AES256'})
51+
if delete_files is True: # deleting local files
52+
FileUtils.remove_if_exists(file_path)
53+
if len(tags['TagSet']) > 0:
54+
try:
55+
self.__s3_client.put_object_tagging(Bucket=bucket, Key=s3_key, Tagging=tags)
56+
except Exception as e:
57+
LOGGER.exception(f'error while adding tags: {tags} to {bucket}/{s3_key}')
58+
raise e
59+
return f's3://{bucket}/{s3_key}'
60+
61+
def upload(self, file_path: str, base_path: str, relative_parent_path: str, delete_files: bool,
62+
s3_name: Union[str, None] = None, obj_tags: dict = {}, overwrite: bool = False):
63+
s3_url = self.__upload_to_s3(base_path, relative_parent_path, file_path, delete_files, True, obj_tags, s3_name)
64+
if delete_files is True: # deleting local files
65+
FileUtils.remove_if_exists(file_path)
66+
return s3_url
67+
1968
def get_s3_stream(self):
2069
return self.__s3_client.get_object(Bucket=self.__target_bucket, Key=self.__target_key)['Body']
2170

cumulus_lambda_functions/lib/utils/file_utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@
2323

2424

2525
class FileUtils:
26+
@staticmethod
27+
def remove_if_exists(file_path):
28+
if os.path.exists(file_path) and os.path.isfile(file_path):
29+
os.remove(file_path)
30+
return
31+
2632
@staticmethod
2733
def mk_dir_p(dir_path):
2834
Path(dir_path).mkdir(parents=True, exist_ok=True)

docker/Dockerfile_download_granules.jpl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,4 @@ COPY setup.py /usr/src/app/unity/setup.py
1212
COPY cumulus_lambda_functions /usr/src/app/unity/cumulus_lambda_functions
1313
WORKDIR /usr/src/app/unity
1414

15-
16-
CMD ["python","-m", "cumulus_lambda_functions.cumulus_download_granules"]
15+
ENTRYPOINT ["python", "-m", "cumulus_lambda_functions.docker_entrypoint"]

docker/Dockerfile_download_granules.public

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,4 @@ COPY setup.py /usr/src/app/unity/setup.py
1212
COPY cumulus_lambda_functions /usr/src/app/unity/cumulus_lambda_functions
1313
WORKDIR /usr/src/app/unity
1414

15-
16-
CMD ["python","-m", "cumulus_lambda_functions.cumulus_download_granules"]
15+
ENTRYPOINT ["python", "-m", "cumulus_lambda_functions.docker_entrypoint"]

docker/docker-compose-granules-download.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ services:
44
image: cumulus_unity:1.0.0-t1
55
volumes:
66
- /tmp/cumulus_granules:/etc/granules
7+
command: ["download"]
78
environment:
89
UNITY_BEARER_TOKEN: 'token without the header "Bearer"'
910
AWS_ACCESS_KEY_ID: 'dd'
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
version: "3.7"
2+
services:
3+
cumulus_granules_upload:
4+
image: cumulus_unity:1.0.0-t1
5+
volumes:
6+
- /tmp/snpp_upload_test_1:/etc/snpp_upload_test_1
7+
command: ["upload"]
8+
environment:
9+
AWS_ACCESS_KEY_ID: 'dd'
10+
AWS_SECRET_ACCESS_KEY: 'dddd'
11+
AWS_SESSION_TOKEN: 'dddd'
12+
AWS_REGION: 'us-west-2'
13+
UNITY_BEARER_TOKEN: 'token without the header "Bearer"'
14+
DAPA_API: 'https://k3a3qmarxh.execute-api.us-west-2.amazonaws.com/dev'
15+
COLLECTION_ID: 'SNDR_SNPP_ATMS_L1A___1'
16+
STAGING_BUCKET: 'am-uds-dev-cumulus-staging'
17+
UPLOAD_DIR: '/etc/snpp_upload_test_1'
18+
PROVIDER_ID: 'SNPP'
19+
VERIFY_SSL: 'FALSE'
20+
DELETE_FILES: 'FALSE'
21+
LOG_LEVEL: '20'
22+
networks:
23+
- internal
24+
networks:
25+
internal:

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
setup(
1818
name="cumulus_lambda_functions",
19-
version="1.5.13",
19+
version="1.5.14",
2020
packages=find_packages(),
2121
install_requires=install_requires,
2222
tests_require=['mock', 'nose', 'sphinx', 'sphinx_rtd_theme', 'coverage'],

tests/cumulus_lambda_functions/cumulus_upload_granules/__init__.py

Whitespace-only changes.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import os
2+
3+
4+
os.environ['DAPA_API'] = 'https://k3a3qmarxh.execute-api.us-west-2.amazonaws.com/dev/am-uds-dapa'
5+
os.environ['UNITY_BEARER_TOKEN'] = 'abcd.abcd.abcd-abcd-abcd'
6+
os.environ['COLLECTION_ID'] = 'SNDR_SNPP_ATMS_L1A___1'
7+
os.environ['PROVIDER_ID'] = 'SNPP'
8+
os.environ['UPLOAD_DIR'] = '/tmp/snpp_upload_test_1'
9+
os.environ['STAGING_BUCKET'] = 'am-uds-dev-cumulus-staging'
10+
os.environ['VERIFY_SSL'] = 'false'
11+
os.environ['DELETE_FILES'] = 'false'
12+
13+
from cumulus_lambda_functions.cumulus_upload_granules.upload_granules import UploadGranules
14+
print(UploadGranules().start())

0 commit comments

Comments
 (0)