Skip to content

ADD: user username & password to get token from cognito #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.1.0] - 2022-04-14
## [1.6.16] - 2022-07-25
### Added
- Added: use username & password to login to cognito to get the token
### Fixed

## [0.1.0] - 2022-04-14
### Added
- Added lambda for parsing metadata from Sounder SIPS L0 metadata files [#14](https://github.com/unity-sds/unity-data-services/issues/14)

### Fixed
- Pushed docker image to ghcr.io
Empty file.
103 changes: 103 additions & 0 deletions cumulus_lambda_functions/cumulus_dapa_client/dapa_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import json
import logging
import os

import requests

from cumulus_lambda_functions.lib.cognito_login.cognito_token_retriever import CognitoTokenRetriever
from cumulus_lambda_functions.lib.constants import Constants

LOGGER = logging.getLogger(__name__)


class DapaClient:
def __init__(self):
self.__token_retriever = CognitoTokenRetriever()
self.__token = None
self.__dapa_base_api = None
self.__get_dapa_base_api()
self.__verify_ssl = True

def with_verify_ssl(self, verify_ssl: bool):
self.__verify_ssl = verify_ssl
return self

def __get_dapa_base_api(self):
if Constants.DAPA_API_KEY not in os.environ:
raise ValueError(f'missing key: {Constants.DAPA_API_KEY}')
self.__dapa_base_api = os.environ.get(Constants.DAPA_API_KEY)
self.__dapa_base_api = self.__dapa_base_api[:-1] if self.__dapa_base_api.endswith('/') else self.__dapa_base_api
return self

def __get_token(self):
if self.__token is None:
self.__token = self.__token_retriever.start()
if self.__token is None:
raise ValueError('unable to retrieve DAPA token')
return self

def get_collection(self, collection_id: str):
"""
TODO need better endpoint to get exactly 1 collection
TODO pagination?
:param collection_id:
:return:
"""
LOGGER.debug(f'getting collection details for: {collection_id}')
self.__get_token()
header = {'Authorization': f'Bearer {self.__token}'}
dapa_collection_url = f'{self.__dapa_base_api}/am-uds-dapa/collections?limit=1000'
response = requests.get(url=dapa_collection_url, headers=header, verify=self.__verify_ssl)
if response.status_code > 400:
raise RuntimeError(
f'querying collections ends in error. status_code: {response.status_code}. url: {dapa_collection_url}. details: {response.text}')
collections_result = json.loads(response.text)
if 'features' not in collections_result:
raise RuntimeError(f'missing features in response. invalid response: response: {collections_result}')
collection_details = [each_collection for each_collection in collections_result['features'] if
collection_id == each_collection["id"]]
if len(collection_details) < 1:
raise RuntimeError(f'unable to find collection in DAPA')
return collection_details[0]

def get_granules(self, collection_id='*', limit=1000, offset=0, date_from='', date_to=''):
"""
TODO: pagination. getting only 1st 1k item
:param collection_id:
:param limit:
:param offset:
:param date_from:
:param date_to:
:return:
"""
dapa_granules_api = f'{self.__dapa_base_api}/am-uds-dapa/collections/{collection_id}/items?limit={limit}&offset={offset}'
if date_from != '' or date_to != '':
dapa_granules_api = f"{dapa_granules_api}&datetime={date_from if date_from != '' else '..'}/{date_to if date_to != '' else '..'}"
LOGGER.debug(f'dapa_granules_api: {dapa_granules_api}')
LOGGER.debug(f'getting granules for: {dapa_granules_api}')
self.__get_token()
header = {'Authorization': f'Bearer {self.__token}'}
response = requests.get(url=dapa_granules_api, headers=header, verify=self.__verify_ssl)
if response.status_code > 400:
raise RuntimeError(
f'querying granules ends in error. status_code: {response.status_code}. url: {dapa_granules_api}. details: {response.text}')
granules_result = json.loads(response.text)
if 'features' not in granules_result:
raise RuntimeError(f'missing features in response. invalid response: response: {granules_result}')
return granules_result['features']

def ingest_granules_w_cnm(self, cnm_ingest_body: dict) -> str:
dapa_ingest_cnm_api = f'{self.__dapa_base_api}/am-uds-dapa/collections/'
LOGGER.debug(f'getting granules for: {dapa_ingest_cnm_api}')
self.__get_token()
header = {
'Authorization': f'Bearer {self.__token}',
'Content-Type': 'application/json',
}
response = requests.put(url=dapa_ingest_cnm_api, headers=header, verify=self.__verify_ssl,
data=json.dumps(cnm_ingest_body))
if response.status_code > 400:
raise RuntimeError(
f'querying granules ingestion ends in error. status_code: {response.status_code}. url: {dapa_ingest_cnm_api}. details: {response.text}')
granules_result = response.text
return granules_result
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@
import logging
import os

import requests

from cumulus_lambda_functions.cumulus_dapa_client.dapa_client import DapaClient
from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils

LOGGER = logging.getLogger(__name__)


class DownloadGranules:
DAPA_API_KEY = 'DAPA_API'
UNITY_BEARER_TOKEN_KEY = 'UNITY_BEARER_TOKEN'
COLLECTION_ID_KEY = 'COLLECTION_ID'
DOWNLOAD_DIR_KEY = 'DOWNLOAD_DIR'

Expand All @@ -22,23 +19,19 @@ class DownloadGranules:
VERIFY_SSL_KEY = 'VERIFY_SSL'

def __init__(self):
self.__dapa_api = ''
self.__unity_bearer_token = ''
self.__collection_id = ''
self.__date_from = ''
self.__date_to = ''
self.__limit = 100
self.__limit = 1000
self.__download_dir = '/tmp'
self.__verify_ssl = True
self.__s3 = AwsS3()

def __set_props_from_env(self):
missing_keys = [k for k in [self.DAPA_API_KEY, self.COLLECTION_ID_KEY, self.DOWNLOAD_DIR_KEY, self.UNITY_BEARER_TOKEN_KEY] if k not in os.environ]
missing_keys = [k for k in [self.COLLECTION_ID_KEY, self.DOWNLOAD_DIR_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')

self.__dapa_api = os.environ.get(self.DAPA_API_KEY)
self.__unity_bearer_token = os.environ.get(self.UNITY_BEARER_TOKEN_KEY)
self.__collection_id = os.environ.get(self.COLLECTION_ID_KEY)
self.__download_dir = os.environ.get(self.DOWNLOAD_DIR_KEY)
self.__download_dir = self.__download_dir[:-1] if self.__download_dir.endswith('/') else self.__download_dir
Expand All @@ -52,25 +45,6 @@ def __set_props_from_env(self):
self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
return self

def __generate_dapa_url(self):
self.__dapa_api = self.__dapa_api[:-1] if self.__dapa_api.endswith('/') else self.__dapa_api
dapa_granules_api = f'{self.__dapa_api}/am-uds-dapa/collections/{self.__collection_id}/items?limit={self.__limit}&offset=0'
if self.__date_from != '' or self.__date_to != '':
dapa_granules_api = f"{dapa_granules_api}&datetime={self.__date_from if self.__date_from != '' else '..'}/{self.__date_to if self.__date_to != '' else '..'}"
LOGGER.debug(f'dapa_granules_api: {dapa_granules_api}')
return dapa_granules_api

def __get_granules(self, dapa_granules_api): # TODO pagination if needed
LOGGER.debug(f'getting granules for: {dapa_granules_api}')
header = {'Authorization': f'Bearer {self.__unity_bearer_token}'}
response = requests.get(url=dapa_granules_api, headers=header, verify=self.__verify_ssl)
if response.status_code > 400:
raise RuntimeError(f'querying granules ends in error. status_code: {response.status_code}. url: {dapa_granules_api}. details: {response.text}')
granules_result = json.loads(response.text)
if 'features' not in granules_result:
raise RuntimeError(f'missing features in response. invalid response: response: {granules_result}')
return granules_result['features']

def __get_downloading_urls(self, granules_result: list):
if len(granules_result) < 1:
LOGGER.warning(f'cannot find any granules')
Expand Down Expand Up @@ -121,8 +95,8 @@ def start(self):
self.__set_props_from_env()
LOGGER.debug(f'creating download dir: {self.__download_dir}')
FileUtils.mk_dir_p(self.__download_dir)
dapa_granules_api = self.__generate_dapa_url()
granules_result = self.__get_granules(dapa_granules_api)
dapa_client = DapaClient().with_verify_ssl(self.__verify_ssl)
granules_result = dapa_client.get_granules(self.__collection_id, self.__limit, 0, self.__date_from, self.__date_to)
downloading_urls = self.__get_downloading_urls(granules_result)
error_list = []
for each in downloading_urls:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@

import requests

from cumulus_lambda_functions.cumulus_dapa_client.dapa_client import DapaClient
from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3

LOGGER = logging.getLogger(__name__)


class UploadGranules:
DAPA_API_KEY = 'DAPA_API'
UNITY_BEARER_TOKEN_KEY = 'UNITY_BEARER_TOKEN'
COLLECTION_ID_KEY = 'COLLECTION_ID'
PROVIDER_ID_KEY = 'PROVIDER_ID'
UPLOAD_DIR_KEY = 'UPLOAD_DIR'
Expand All @@ -24,8 +23,6 @@ class UploadGranules:
DELETE_FILES_KEY = 'DELETE_FILES'

def __init__(self):
self.__dapa_api = ''
self.__unity_bearer_token = ''
self.__collection_id = ''
self.__collection_details = {}
self.__uploading_granules = []
Expand All @@ -38,13 +35,10 @@ def __init__(self):
self.__raw_files = []

def __set_props_from_env(self):
missing_keys = [k for k in [self.DAPA_API_KEY, self.COLLECTION_ID_KEY, self.PROVIDER_ID_KEY, self.UPLOAD_DIR_KEY, self.UNITY_BEARER_TOKEN_KEY, self.STAGING_BUCKET_KEY] if k not in os.environ]
missing_keys = [k for k in [self.COLLECTION_ID_KEY, self.PROVIDER_ID_KEY, self.UPLOAD_DIR_KEY, self.STAGING_BUCKET_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')

self.__dapa_api = os.environ.get(self.DAPA_API_KEY)
self.__dapa_api = self.__dapa_api[:-1] if self.__dapa_api.endswith('/') else self.__dapa_api
self.__unity_bearer_token = os.environ.get(self.UNITY_BEARER_TOKEN_KEY)
self.__collection_id = os.environ.get(self.COLLECTION_ID_KEY)
self.__provider_id = os.environ.get(self.PROVIDER_ID_KEY)
self.__staging_bucket = os.environ.get(self.STAGING_BUCKET_KEY)
Expand All @@ -56,25 +50,6 @@ def __set_props_from_env(self):
self.__delete_files = os.environ.get(self.DELETE_FILES_KEY, 'FALSE').strip().upper() == 'TRUE'
return self

def __get_collection_stac(self):
LOGGER.debug(f'getting collection details for: {self.__collection_id}')
header = {'Authorization': f'Bearer {self.__unity_bearer_token}'}
dapa_collection_url = f'{self.__dapa_api}/am-uds-dapa/collections?limit=1000'
# TODO need better endpoint to get exactly 1 collection
# TODO pagination?
response = requests.get(url=dapa_collection_url, headers=header, verify=self.__verify_ssl)
if response.status_code > 400:
raise RuntimeError(f'querying collections ends in error. status_code: {response.status_code}. url: {dapa_collection_url}. details: {response.text}')
collections_result = json.loads(response.text)
if 'features' not in collections_result:
raise RuntimeError(f'missing features in response. invalid response: response: {collections_result}')
print(self.__collection_id)
collection_details = [each_collection for each_collection in collections_result['features'] if self.__collection_id == each_collection["id"]]
if len(collection_details) < 1:
raise RuntimeError(f'unable to find collection in DAPA')
self.__collection_details = collection_details[0]
return self

def __sort_granules(self):
file_regex_list = {k['type']: k['href'].split('___')[-1] for k in self.__collection_details['links'] if not k['title'].endswith('cmr.xml')}
granule_id_extraction = self.__collection_details['summaries']['granuleIdExtraction']
Expand Down Expand Up @@ -111,20 +86,6 @@ def __upload_granules(self, granule_assets: dict, granule_id: str):
href_dict['href'] = s3_url
return self

def __execute_dapa_cnm_ingestion(self, cnm_ingest_body: dict):
dapa_ingest_cnm_api = f'{self.__dapa_api}/am-uds-dapa/collections/'
LOGGER.debug(f'getting granules for: {dapa_ingest_cnm_api}')
header = {
'Authorization': f'Bearer {self.__unity_bearer_token}',
'Content-Type': 'application/json',
}
response = requests.put(url=dapa_ingest_cnm_api, headers=header, verify=self.__verify_ssl, data=json.dumps(cnm_ingest_body))
if response.status_code > 400:
raise RuntimeError(
f'querying granules ingestion ends in error. status_code: {response.status_code}. url: {dapa_ingest_cnm_api}. details: {response.text}')
granules_result = response.text
return granules_result

def start(self):
"""

Expand All @@ -139,7 +100,8 @@ def start(self):
self.__set_props_from_env()
LOGGER.debug(f'listing files recursively in dir: {self.__upload_dir}')
self.__raw_files = glob(f'{self.__upload_dir}/**/*', recursive=True)
self.__get_collection_stac()
dapa_client = DapaClient().with_verify_ssl(self.__verify_ssl)
self.__collection_details = dapa_client.get_collection(self.__collection_id)
on_disk_granules = self.__sort_granules()
LOGGER.debug(f'on_disk_granules: {on_disk_granules}')
dapa_body_granules = []
Expand All @@ -156,5 +118,5 @@ def start(self):
"features": dapa_body_granules
}
LOGGER.debug(f'dapa_body_granules: {dapa_body}')
dapa_ingest_reuslt = self.__execute_dapa_cnm_ingestion(dapa_body)
return dapa_ingest_reuslt
dapa_ingest_result = dapa_client.ingest_granules_w_cnm(dapa_body)
return dapa_ingest_result
41 changes: 41 additions & 0 deletions cumulus_lambda_functions/lib/aws/aws_param_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import logging
from typing import Union

from botocore.exceptions import ClientError

from cumulus_lambda_functions.lib.aws.aws_cred import AwsCred

LOGGER = logging.getLogger()


class AwsParamStore(AwsCred):
def __init__(self):
super().__init__()
self.__ssm_client = self.get_client('ssm')
self.__ssm = None

def set_param(self, key: str, val: str, encrypted: bool = True):
self.__ssm_client.put_parameter(Name=key, Value=val,
Type='SecureString' if encrypted else 'String',
Overwrite=True)

def get_param(self, param_name: str) -> Union[str, None]:
"""
Ref: https://stackoverflow.com/questions/46063138/how-can-i-import-the-boto3-ssm-parameternotfound-exception
on how to catch ParameterNotFound exception

:param param_name: named of parameter in Parameter store
:return: plain value of the parameter or None if some exception.
"""
try:
param_response = self.__ssm_client.get_parameter(Name=param_name, WithDecryption=True)
if 'ResponseMetadata' not in param_response or \
'HTTPStatusCode' not in param_response['ResponseMetadata'] or \
param_response['ResponseMetadata']['HTTPStatusCode'] != 200 or \
'Parameter' not in param_response or \
'Value' not in param_response['Parameter']:
return None
except ClientError:
LOGGER.exception('cannot get parameter store value for %s', param_name)
return None
return param_response['Parameter']['Value']
Empty file.
Loading