Skip to content

release/1.5.14 #56

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, event):
self.__cumulus.with_page_number(self.__page_number)

def __assign_values(self):
if 'queryStringParameters' not in self.__event:
if 'queryStringParameters' not in self.__event or self.__event['queryStringParameters'] is None:
return self
query_str_dict = self.__event['queryStringParameters']
if 'limit' in query_str_dict:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def __assign_values(self):
# if self.__jwt_token[:6].lower() == 'bearer':
# self.__jwt_token = self.__jwt_token[6:].strip()
self.__jwt_token = 'NA'
if 'queryStringParameters' not in self.__event:
if 'queryStringParameters' not in self.__event or self.__event['queryStringParameters'] is None:
return self
query_str_dict = self.__event['queryStringParameters']
if 'datetime' in query_str_dict:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ def __init__(self, event):
self.__sns_topic_arn = os.getenv('SNS_TOPIC_ARN')

def __get_json_request_body(self):
if 'requestContext' not in self.__event:
raise ValueError(f'missing requestContext in {self.__event}')
self.__request_body = self.__event['requestContext']
if 'body' not in self.__event:
raise ValueError(f'missing body in {self.__event}')
self.__request_body = json.loads(self.__event['body'])
validation_result = JsonValidator(REQUEST_BODY_SCHEMA).validate(self.__request_body)
if validation_result is not None:
LOGGER.debug(f'invalid request body: {self.__request_body}')
raise ValueError(f'invalid cumulus granule json: {validation_result}')
return self

Expand Down
Empty file.
160 changes: 160 additions & 0 deletions cumulus_lambda_functions/cumulus_upload_granules/upload_granules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import json
import logging
import os
import re
from collections import defaultdict
from glob import glob

import requests

from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3

LOGGER = logging.getLogger(__name__)


class UploadGranules:
DAPA_API_KEY = 'DAPA_API'
UNITY_BEARER_TOKEN_KEY = 'UNITY_BEARER_TOKEN'
COLLECTION_ID_KEY = 'COLLECTION_ID'
PROVIDER_ID_KEY = 'PROVIDER_ID'
UPLOAD_DIR_KEY = 'UPLOAD_DIR'
STAGING_BUCKET_KEY = 'STAGING_BUCKET'

VERIFY_SSL_KEY = 'VERIFY_SSL'
DELETE_FILES_KEY = 'DELETE_FILES'

def __init__(self):
self.__dapa_api = ''
self.__unity_bearer_token = ''
self.__collection_id = ''
self.__collection_details = {}
self.__uploading_granules = []
self.__provider_id = ''
self.__staging_bucket = ''
self.__upload_dir = '/tmp'
self.__verify_ssl = True
self.__delete_files = False
self.__s3 = AwsS3()
self.__raw_files = []

def __set_props_from_env(self):
missing_keys = [k for k in [self.DAPA_API_KEY, self.COLLECTION_ID_KEY, self.PROVIDER_ID_KEY, self.UPLOAD_DIR_KEY, self.UNITY_BEARER_TOKEN_KEY, self.STAGING_BUCKET_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')

self.__dapa_api = os.environ.get(self.DAPA_API_KEY)
self.__dapa_api = self.__dapa_api[:-1] if self.__dapa_api.endswith('/') else self.__dapa_api
self.__unity_bearer_token = os.environ.get(self.UNITY_BEARER_TOKEN_KEY)
self.__collection_id = os.environ.get(self.COLLECTION_ID_KEY)
self.__provider_id = os.environ.get(self.PROVIDER_ID_KEY)
self.__staging_bucket = os.environ.get(self.STAGING_BUCKET_KEY)

self.__upload_dir = os.environ.get(self.UPLOAD_DIR_KEY)
self.__upload_dir = self.__upload_dir[:-1] if self.__upload_dir.endswith('/') else self.__upload_dir

self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
self.__delete_files = os.environ.get(self.DELETE_FILES_KEY, 'FALSE').strip().upper() == 'TRUE'
return self

def __get_collection_stac(self):
LOGGER.debug(f'getting collection details for: {self.__collection_id}')
header = {'Authorization': f'Bearer {self.__unity_bearer_token}'}
dapa_collection_url = f'{self.__dapa_api}/am-uds-dapa/collections?limit=1000'
# TODO need better endpoint to get exactly 1 collection
# TODO pagination?
response = requests.get(url=dapa_collection_url, headers=header, verify=self.__verify_ssl)
if response.status_code > 400:
raise RuntimeError(f'querying collections ends in error. status_code: {response.status_code}. url: {dapa_collection_url}. details: {response.text}')
collections_result = json.loads(response.text)
if 'features' not in collections_result:
raise RuntimeError(f'missing features in response. invalid response: response: {collections_result}')
print(self.__collection_id)
collection_details = [each_collection for each_collection in collections_result['features'] if self.__collection_id == each_collection["id"]]
if len(collection_details) < 1:
raise RuntimeError(f'unable to find collection in DAPA')
self.__collection_details = collection_details[0]
return self

def __sort_granules(self):
file_regex_list = {k['type']: k['href'].split('___')[-1] for k in self.__collection_details['links'] if not k['title'].endswith('cmr.xml')}
granule_id_extraction = self.__collection_details['summaries']['granuleIdExtraction']
granules = defaultdict(dict)
for each_file in self.__raw_files:
each_filename = os.path.basename(each_file)
each_granule_id = re.findall(granule_id_extraction, each_filename)
if len(each_granule_id) < 1:
LOGGER.warning(f'skipping file that cannot be matched to granule_id: {each_file}')
continue
each_granule_id = each_granule_id[0]
if isinstance(each_granule_id, tuple):
each_granule_id = each_granule_id[0]
data_type = [k for k, v in file_regex_list.items() if len(re.findall(v, each_filename)) > 0]
if len(data_type) != 1:
LOGGER.warning(f'skipping file that cannot be matched to a datatype: {each_file}.. data_type: {data_type}')
continue
data_type = data_type[0]
if data_type in granules[each_granule_id]:
LOGGER.warning(f'duplicated data type: {data_type}. file: {each_file}. existing data_type: {granules[each_granule_id][data_type]}')
continue
granules[each_granule_id][data_type] = {
'href': each_file
}
LOGGER.debug(f'filtering granules w/o data. original len: {len(granules)} original granules: {granules}')
granules = {k: v for k, v in granules.items() if 'data' in v}
LOGGER.debug(f'filtered granules. original len: {len(granules)}. granules: {granules}')
return granules

def __upload_granules(self, granule_assets: dict, granule_id: str):
for data_type, href_dict in granule_assets.items():
LOGGER.debug(f'uploading {href_dict}')
s3_url = self.__s3.upload(href_dict['href'], self.__staging_bucket, granule_id, self.__delete_files)
href_dict['href'] = s3_url
return self

def __execute_dapa_cnm_ingestion(self, cnm_ingest_body: dict):
dapa_ingest_cnm_api = f'{self.__dapa_api}/am-uds-dapa/collections/'
LOGGER.debug(f'getting granules for: {dapa_ingest_cnm_api}')
header = {
'Authorization': f'Bearer {self.__unity_bearer_token}',
'Content-Type': 'application/json',
}
response = requests.put(url=dapa_ingest_cnm_api, headers=header, verify=self.__verify_ssl, data=json.dumps(cnm_ingest_body))
if response.status_code > 400:
raise RuntimeError(
f'querying granules ingestion ends in error. status_code: {response.status_code}. url: {dapa_ingest_cnm_api}. details: {response.text}')
granules_result = response.text
return granules_result

def start(self):
"""

1. recursively get all files from upload dir
2. use collection id to get the links
3. group files from step-1 into granules
4. get granule ID ???
5. upload to staging bucket with granuleID as key
6. call DAPA endpoint to start the registration to cumulus
:return:
"""
self.__set_props_from_env()
LOGGER.debug(f'listing files recursively in dir: {self.__upload_dir}')
self.__raw_files = glob(f'{self.__upload_dir}/**/*', recursive=True)
self.__get_collection_stac()
on_disk_granules = self.__sort_granules()
LOGGER.debug(f'on_disk_granules: {on_disk_granules}')
dapa_body_granules = []
for granule_id, granule_hrefs in on_disk_granules.items():
self.__upload_granules(granule_hrefs, granule_id)
dapa_body_granules.append({
'id': granule_id,
'collection': self.__collection_id,
'assets': granule_hrefs,
})
LOGGER.debug(f'dapa_body_granules: {dapa_body_granules}')
dapa_body = {
"provider_id": self.__provider_id,
"features": dapa_body_granules
}
LOGGER.debug(f'dapa_body_granules: {dapa_body}')
dapa_ingest_reuslt = self.__execute_dapa_cnm_ingestion(dapa_body)
return dapa_ingest_reuslt
Empty file.
24 changes: 24 additions & 0 deletions cumulus_lambda_functions/docker_entrypoint/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import logging
import os
from sys import argv

from cumulus_lambda_functions.cumulus_download_granules.download_granules import DownloadGranules
from cumulus_lambda_functions.cumulus_upload_granules.upload_granules import UploadGranules


def choose_process():
if argv[1].strip().upper() == 'DOWNLOAD':
logging.info('starting DOWNLOAD script')
DownloadGranules().start()
elif argv[1].strip().upper() == 'UPLOAD':
logging.info('starting UPLOAD script')
logging.info(UploadGranules().start())
else:
raise ValueError(f'invalid argument: {argv}')
return


if __name__ == '__main__':
logging.basicConfig(level=int(os.environ.get('LOG_LEVEL', '10')),
format="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s")
choose_process()
49 changes: 49 additions & 0 deletions cumulus_lambda_functions/lib/aws/aws_s3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import os
from io import BytesIO
from typing import Union

from cumulus_lambda_functions.lib.aws.aws_cred import AwsCred
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils

Expand All @@ -16,6 +18,53 @@ def __init__(self):
self.__target_bucket = None
self.__target_key = None

def __upload_to_s3(self, bucket, prefix, file_path, delete_files=False, add_size=True, other_tags={}, s3_name=None):
"""
Uploading a file to S3
:param bucket: string - name of bucket
:param prefix: string - prefix. don't start and end with `/` to avoid extra unnamed dirs
:param file_path: string - absolute path of file location
:param delete_files: boolean - deleting original file. default: False
:param add_size: boolean - adding the file size as tag. default: True
:param other_tags: dict - key-value pairs as a dictionary
:param s3_name: string - name of s3 file if the user wishes to change.
using the actual filename if not provided. defaulted to None
:return: None
"""
tags = {
'TagSet': []
}
if add_size is True:
tags['TagSet'].append({
'Key': 'org_size',
'Value': str(FileUtils.get_size(file_path))
})
for key, val in other_tags.items():
tags['TagSet'].append({
'Key': key,
'Value': str(val)
})
if s3_name is None:
s3_name = os.path.basename(file_path)
s3_key = '{}/{}'.format(prefix, s3_name)
self.__s3_client.upload_file(file_path, bucket, s3_key, ExtraArgs={'ServerSideEncryption': 'AES256'})
if delete_files is True: # deleting local files
FileUtils.remove_if_exists(file_path)
if len(tags['TagSet']) > 0:
try:
self.__s3_client.put_object_tagging(Bucket=bucket, Key=s3_key, Tagging=tags)
except Exception as e:
LOGGER.exception(f'error while adding tags: {tags} to {bucket}/{s3_key}')
raise e
return f's3://{bucket}/{s3_key}'

def upload(self, file_path: str, base_path: str, relative_parent_path: str, delete_files: bool,
s3_name: Union[str, None] = None, obj_tags: dict = {}, overwrite: bool = False):
s3_url = self.__upload_to_s3(base_path, relative_parent_path, file_path, delete_files, True, obj_tags, s3_name)
if delete_files is True: # deleting local files
FileUtils.remove_if_exists(file_path)
return s3_url

def get_s3_stream(self):
return self.__s3_client.get_object(Bucket=self.__target_bucket, Key=self.__target_key)['Body']

Expand Down
6 changes: 6 additions & 0 deletions cumulus_lambda_functions/lib/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@


class FileUtils:
@staticmethod
def remove_if_exists(file_path):
if os.path.exists(file_path) and os.path.isfile(file_path):
os.remove(file_path)
return

@staticmethod
def mk_dir_p(dir_path):
Path(dir_path).mkdir(parents=True, exist_ok=True)
Expand Down
3 changes: 1 addition & 2 deletions docker/Dockerfile_download_granules.jpl
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@ COPY setup.py /usr/src/app/unity/setup.py
COPY cumulus_lambda_functions /usr/src/app/unity/cumulus_lambda_functions
WORKDIR /usr/src/app/unity


CMD ["python","-m", "cumulus_lambda_functions.cumulus_download_granules"]
ENTRYPOINT ["python", "-m", "cumulus_lambda_functions.docker_entrypoint"]
3 changes: 1 addition & 2 deletions docker/Dockerfile_download_granules.public
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@ COPY setup.py /usr/src/app/unity/setup.py
COPY cumulus_lambda_functions /usr/src/app/unity/cumulus_lambda_functions
WORKDIR /usr/src/app/unity


CMD ["python","-m", "cumulus_lambda_functions.cumulus_download_granules"]
ENTRYPOINT ["python", "-m", "cumulus_lambda_functions.docker_entrypoint"]
1 change: 1 addition & 0 deletions docker/docker-compose-granules-download.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ services:
image: cumulus_unity:1.0.0-t1
volumes:
- /tmp/cumulus_granules:/etc/granules
command: ["download"]
environment:
UNITY_BEARER_TOKEN: 'token without the header "Bearer"'
AWS_ACCESS_KEY_ID: 'dd'
Expand Down
25 changes: 25 additions & 0 deletions docker/docker-compose-granules-upload.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
version: "3.7"
services:
cumulus_granules_upload:
image: cumulus_unity:1.0.0-t1
volumes:
- /tmp/snpp_upload_test_1:/etc/snpp_upload_test_1
command: ["upload"]
environment:
AWS_ACCESS_KEY_ID: 'dd'
AWS_SECRET_ACCESS_KEY: 'dddd'
AWS_SESSION_TOKEN: 'dddd'
AWS_REGION: 'us-west-2'
UNITY_BEARER_TOKEN: 'token without the header "Bearer"'
DAPA_API: 'https://k3a3qmarxh.execute-api.us-west-2.amazonaws.com/dev'
COLLECTION_ID: 'SNDR_SNPP_ATMS_L1A___1'
STAGING_BUCKET: 'am-uds-dev-cumulus-staging'
UPLOAD_DIR: '/etc/snpp_upload_test_1'
PROVIDER_ID: 'SNPP'
VERIFY_SSL: 'FALSE'
DELETE_FILES: 'FALSE'
LOG_LEVEL: '20'
networks:
- internal
networks:
internal:
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

setup(
name="cumulus_lambda_functions",
version="1.5.13",
version="1.5.14",
packages=find_packages(),
install_requires=install_requires,
tests_require=['mock', 'nose', 'sphinx', 'sphinx_rtd_theme', 'coverage'],
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import os


os.environ['DAPA_API'] = 'https://k3a3qmarxh.execute-api.us-west-2.amazonaws.com/dev/am-uds-dapa'
os.environ['UNITY_BEARER_TOKEN'] = 'abcd.abcd.abcd-abcd-abcd'
os.environ['COLLECTION_ID'] = 'SNDR_SNPP_ATMS_L1A___1'
os.environ['PROVIDER_ID'] = 'SNPP'
os.environ['UPLOAD_DIR'] = '/tmp/snpp_upload_test_1'
os.environ['STAGING_BUCKET'] = 'am-uds-dev-cumulus-staging'
os.environ['VERIFY_SSL'] = 'false'
os.environ['DELETE_FILES'] = 'false'

from cumulus_lambda_functions.cumulus_upload_granules.upload_granules import UploadGranules
print(UploadGranules().start())
Loading