Skip to content

Commit

Permalink
ADD: user username & password to get token from cognito (#61)
Browse files Browse the repository at this point in the history
* feat: user username & password to get token from cognito

* chore: update changelog to match with version

* chore: change release order
  • Loading branch information
wphyojpl authored Jul 25, 2022
1 parent 6e5d037 commit 96a9434
Show file tree
Hide file tree
Showing 17 changed files with 410 additions and 82 deletions.
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.1.0] - 2022-04-14
## [1.6.16] - 2022-07-25
### Added
- Added: use username & password to login to cognito to get the token
### Fixed

## [0.1.0] - 2022-04-14
### Added
- Added lambda for parsing metadata from Sounder SIPS L0 metadata files [#14](https://github.com/unity-sds/unity-data-services/issues/14)

### Fixed
- Pushed docker image to ghcr.io
Empty file.
103 changes: 103 additions & 0 deletions cumulus_lambda_functions/cumulus_dapa_client/dapa_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import json
import logging
import os

import requests

from cumulus_lambda_functions.lib.cognito_login.cognito_token_retriever import CognitoTokenRetriever
from cumulus_lambda_functions.lib.constants import Constants

LOGGER = logging.getLogger(__name__)


class DapaClient:
def __init__(self):
self.__token_retriever = CognitoTokenRetriever()
self.__token = None
self.__dapa_base_api = None
self.__get_dapa_base_api()
self.__verify_ssl = True

def with_verify_ssl(self, verify_ssl: bool):
self.__verify_ssl = verify_ssl
return self

def __get_dapa_base_api(self):
if Constants.DAPA_API_KEY not in os.environ:
raise ValueError(f'missing key: {Constants.DAPA_API_KEY}')
self.__dapa_base_api = os.environ.get(Constants.DAPA_API_KEY)
self.__dapa_base_api = self.__dapa_base_api[:-1] if self.__dapa_base_api.endswith('/') else self.__dapa_base_api
return self

def __get_token(self):
if self.__token is None:
self.__token = self.__token_retriever.start()
if self.__token is None:
raise ValueError('unable to retrieve DAPA token')
return self

def get_collection(self, collection_id: str):
"""
TODO need better endpoint to get exactly 1 collection
TODO pagination?
:param collection_id:
:return:
"""
LOGGER.debug(f'getting collection details for: {collection_id}')
self.__get_token()
header = {'Authorization': f'Bearer {self.__token}'}
dapa_collection_url = f'{self.__dapa_base_api}/am-uds-dapa/collections?limit=1000'
response = requests.get(url=dapa_collection_url, headers=header, verify=self.__verify_ssl)
if response.status_code > 400:
raise RuntimeError(
f'querying collections ends in error. status_code: {response.status_code}. url: {dapa_collection_url}. details: {response.text}')
collections_result = json.loads(response.text)
if 'features' not in collections_result:
raise RuntimeError(f'missing features in response. invalid response: response: {collections_result}')
collection_details = [each_collection for each_collection in collections_result['features'] if
collection_id == each_collection["id"]]
if len(collection_details) < 1:
raise RuntimeError(f'unable to find collection in DAPA')
return collection_details[0]

def get_granules(self, collection_id='*', limit=1000, offset=0, date_from='', date_to=''):
"""
TODO: pagination. getting only 1st 1k item
:param collection_id:
:param limit:
:param offset:
:param date_from:
:param date_to:
:return:
"""
dapa_granules_api = f'{self.__dapa_base_api}/am-uds-dapa/collections/{collection_id}/items?limit={limit}&offset={offset}'
if date_from != '' or date_to != '':
dapa_granules_api = f"{dapa_granules_api}&datetime={date_from if date_from != '' else '..'}/{date_to if date_to != '' else '..'}"
LOGGER.debug(f'dapa_granules_api: {dapa_granules_api}')
LOGGER.debug(f'getting granules for: {dapa_granules_api}')
self.__get_token()
header = {'Authorization': f'Bearer {self.__token}'}
response = requests.get(url=dapa_granules_api, headers=header, verify=self.__verify_ssl)
if response.status_code > 400:
raise RuntimeError(
f'querying granules ends in error. status_code: {response.status_code}. url: {dapa_granules_api}. details: {response.text}')
granules_result = json.loads(response.text)
if 'features' not in granules_result:
raise RuntimeError(f'missing features in response. invalid response: response: {granules_result}')
return granules_result['features']

def ingest_granules_w_cnm(self, cnm_ingest_body: dict) -> str:
dapa_ingest_cnm_api = f'{self.__dapa_base_api}/am-uds-dapa/collections/'
LOGGER.debug(f'getting granules for: {dapa_ingest_cnm_api}')
self.__get_token()
header = {
'Authorization': f'Bearer {self.__token}',
'Content-Type': 'application/json',
}
response = requests.put(url=dapa_ingest_cnm_api, headers=header, verify=self.__verify_ssl,
data=json.dumps(cnm_ingest_body))
if response.status_code > 400:
raise RuntimeError(
f'querying granules ingestion ends in error. status_code: {response.status_code}. url: {dapa_ingest_cnm_api}. details: {response.text}')
granules_result = response.text
return granules_result
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@
import logging
import os

import requests

from cumulus_lambda_functions.cumulus_dapa_client.dapa_client import DapaClient
from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils

LOGGER = logging.getLogger(__name__)


class DownloadGranules:
DAPA_API_KEY = 'DAPA_API'
UNITY_BEARER_TOKEN_KEY = 'UNITY_BEARER_TOKEN'
COLLECTION_ID_KEY = 'COLLECTION_ID'
DOWNLOAD_DIR_KEY = 'DOWNLOAD_DIR'

Expand All @@ -22,23 +19,19 @@ class DownloadGranules:
VERIFY_SSL_KEY = 'VERIFY_SSL'

def __init__(self):
self.__dapa_api = ''
self.__unity_bearer_token = ''
self.__collection_id = ''
self.__date_from = ''
self.__date_to = ''
self.__limit = 100
self.__limit = 1000
self.__download_dir = '/tmp'
self.__verify_ssl = True
self.__s3 = AwsS3()

def __set_props_from_env(self):
missing_keys = [k for k in [self.DAPA_API_KEY, self.COLLECTION_ID_KEY, self.DOWNLOAD_DIR_KEY, self.UNITY_BEARER_TOKEN_KEY] if k not in os.environ]
missing_keys = [k for k in [self.COLLECTION_ID_KEY, self.DOWNLOAD_DIR_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')

self.__dapa_api = os.environ.get(self.DAPA_API_KEY)
self.__unity_bearer_token = os.environ.get(self.UNITY_BEARER_TOKEN_KEY)
self.__collection_id = os.environ.get(self.COLLECTION_ID_KEY)
self.__download_dir = os.environ.get(self.DOWNLOAD_DIR_KEY)
self.__download_dir = self.__download_dir[:-1] if self.__download_dir.endswith('/') else self.__download_dir
Expand All @@ -52,25 +45,6 @@ def __set_props_from_env(self):
self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
return self

def __generate_dapa_url(self):
self.__dapa_api = self.__dapa_api[:-1] if self.__dapa_api.endswith('/') else self.__dapa_api
dapa_granules_api = f'{self.__dapa_api}/am-uds-dapa/collections/{self.__collection_id}/items?limit={self.__limit}&offset=0'
if self.__date_from != '' or self.__date_to != '':
dapa_granules_api = f"{dapa_granules_api}&datetime={self.__date_from if self.__date_from != '' else '..'}/{self.__date_to if self.__date_to != '' else '..'}"
LOGGER.debug(f'dapa_granules_api: {dapa_granules_api}')
return dapa_granules_api

def __get_granules(self, dapa_granules_api): # TODO pagination if needed
LOGGER.debug(f'getting granules for: {dapa_granules_api}')
header = {'Authorization': f'Bearer {self.__unity_bearer_token}'}
response = requests.get(url=dapa_granules_api, headers=header, verify=self.__verify_ssl)
if response.status_code > 400:
raise RuntimeError(f'querying granules ends in error. status_code: {response.status_code}. url: {dapa_granules_api}. details: {response.text}')
granules_result = json.loads(response.text)
if 'features' not in granules_result:
raise RuntimeError(f'missing features in response. invalid response: response: {granules_result}')
return granules_result['features']

def __get_downloading_urls(self, granules_result: list):
if len(granules_result) < 1:
LOGGER.warning(f'cannot find any granules')
Expand Down Expand Up @@ -121,8 +95,8 @@ def start(self):
self.__set_props_from_env()
LOGGER.debug(f'creating download dir: {self.__download_dir}')
FileUtils.mk_dir_p(self.__download_dir)
dapa_granules_api = self.__generate_dapa_url()
granules_result = self.__get_granules(dapa_granules_api)
dapa_client = DapaClient().with_verify_ssl(self.__verify_ssl)
granules_result = dapa_client.get_granules(self.__collection_id, self.__limit, 0, self.__date_from, self.__date_to)
downloading_urls = self.__get_downloading_urls(granules_result)
error_list = []
for each in downloading_urls:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@

import requests

from cumulus_lambda_functions.cumulus_dapa_client.dapa_client import DapaClient
from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3

LOGGER = logging.getLogger(__name__)


class UploadGranules:
DAPA_API_KEY = 'DAPA_API'
UNITY_BEARER_TOKEN_KEY = 'UNITY_BEARER_TOKEN'
COLLECTION_ID_KEY = 'COLLECTION_ID'
PROVIDER_ID_KEY = 'PROVIDER_ID'
UPLOAD_DIR_KEY = 'UPLOAD_DIR'
Expand All @@ -24,8 +23,6 @@ class UploadGranules:
DELETE_FILES_KEY = 'DELETE_FILES'

def __init__(self):
self.__dapa_api = ''
self.__unity_bearer_token = ''
self.__collection_id = ''
self.__collection_details = {}
self.__uploading_granules = []
Expand All @@ -38,13 +35,10 @@ def __init__(self):
self.__raw_files = []

def __set_props_from_env(self):
missing_keys = [k for k in [self.DAPA_API_KEY, self.COLLECTION_ID_KEY, self.PROVIDER_ID_KEY, self.UPLOAD_DIR_KEY, self.UNITY_BEARER_TOKEN_KEY, self.STAGING_BUCKET_KEY] if k not in os.environ]
missing_keys = [k for k in [self.COLLECTION_ID_KEY, self.PROVIDER_ID_KEY, self.UPLOAD_DIR_KEY, self.STAGING_BUCKET_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')

self.__dapa_api = os.environ.get(self.DAPA_API_KEY)
self.__dapa_api = self.__dapa_api[:-1] if self.__dapa_api.endswith('/') else self.__dapa_api
self.__unity_bearer_token = os.environ.get(self.UNITY_BEARER_TOKEN_KEY)
self.__collection_id = os.environ.get(self.COLLECTION_ID_KEY)
self.__provider_id = os.environ.get(self.PROVIDER_ID_KEY)
self.__staging_bucket = os.environ.get(self.STAGING_BUCKET_KEY)
Expand All @@ -56,25 +50,6 @@ def __set_props_from_env(self):
self.__delete_files = os.environ.get(self.DELETE_FILES_KEY, 'FALSE').strip().upper() == 'TRUE'
return self

def __get_collection_stac(self):
LOGGER.debug(f'getting collection details for: {self.__collection_id}')
header = {'Authorization': f'Bearer {self.__unity_bearer_token}'}
dapa_collection_url = f'{self.__dapa_api}/am-uds-dapa/collections?limit=1000'
# TODO need better endpoint to get exactly 1 collection
# TODO pagination?
response = requests.get(url=dapa_collection_url, headers=header, verify=self.__verify_ssl)
if response.status_code > 400:
raise RuntimeError(f'querying collections ends in error. status_code: {response.status_code}. url: {dapa_collection_url}. details: {response.text}')
collections_result = json.loads(response.text)
if 'features' not in collections_result:
raise RuntimeError(f'missing features in response. invalid response: response: {collections_result}')
print(self.__collection_id)
collection_details = [each_collection for each_collection in collections_result['features'] if self.__collection_id == each_collection["id"]]
if len(collection_details) < 1:
raise RuntimeError(f'unable to find collection in DAPA')
self.__collection_details = collection_details[0]
return self

def __sort_granules(self):
file_regex_list = {k['type']: k['href'].split('___')[-1] for k in self.__collection_details['links'] if not k['title'].endswith('cmr.xml')}
granule_id_extraction = self.__collection_details['summaries']['granuleIdExtraction']
Expand Down Expand Up @@ -111,20 +86,6 @@ def __upload_granules(self, granule_assets: dict, granule_id: str):
href_dict['href'] = s3_url
return self

def __execute_dapa_cnm_ingestion(self, cnm_ingest_body: dict):
dapa_ingest_cnm_api = f'{self.__dapa_api}/am-uds-dapa/collections/'
LOGGER.debug(f'getting granules for: {dapa_ingest_cnm_api}')
header = {
'Authorization': f'Bearer {self.__unity_bearer_token}',
'Content-Type': 'application/json',
}
response = requests.put(url=dapa_ingest_cnm_api, headers=header, verify=self.__verify_ssl, data=json.dumps(cnm_ingest_body))
if response.status_code > 400:
raise RuntimeError(
f'querying granules ingestion ends in error. status_code: {response.status_code}. url: {dapa_ingest_cnm_api}. details: {response.text}')
granules_result = response.text
return granules_result

def start(self):
"""
Expand All @@ -139,7 +100,8 @@ def start(self):
self.__set_props_from_env()
LOGGER.debug(f'listing files recursively in dir: {self.__upload_dir}')
self.__raw_files = glob(f'{self.__upload_dir}/**/*', recursive=True)
self.__get_collection_stac()
dapa_client = DapaClient().with_verify_ssl(self.__verify_ssl)
self.__collection_details = dapa_client.get_collection(self.__collection_id)
on_disk_granules = self.__sort_granules()
LOGGER.debug(f'on_disk_granules: {on_disk_granules}')
dapa_body_granules = []
Expand All @@ -156,5 +118,5 @@ def start(self):
"features": dapa_body_granules
}
LOGGER.debug(f'dapa_body_granules: {dapa_body}')
dapa_ingest_reuslt = self.__execute_dapa_cnm_ingestion(dapa_body)
return dapa_ingest_reuslt
dapa_ingest_result = dapa_client.ingest_granules_w_cnm(dapa_body)
return dapa_ingest_result
41 changes: 41 additions & 0 deletions cumulus_lambda_functions/lib/aws/aws_param_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import logging
from typing import Union

from botocore.exceptions import ClientError

from cumulus_lambda_functions.lib.aws.aws_cred import AwsCred

LOGGER = logging.getLogger()


class AwsParamStore(AwsCred):
def __init__(self):
super().__init__()
self.__ssm_client = self.get_client('ssm')
self.__ssm = None

def set_param(self, key: str, val: str, encrypted: bool = True):
self.__ssm_client.put_parameter(Name=key, Value=val,
Type='SecureString' if encrypted else 'String',
Overwrite=True)

def get_param(self, param_name: str) -> Union[str, None]:
"""
Ref: https://stackoverflow.com/questions/46063138/how-can-i-import-the-boto3-ssm-parameternotfound-exception
on how to catch ParameterNotFound exception
:param param_name: named of parameter in Parameter store
:return: plain value of the parameter or None if some exception.
"""
try:
param_response = self.__ssm_client.get_parameter(Name=param_name, WithDecryption=True)
if 'ResponseMetadata' not in param_response or \
'HTTPStatusCode' not in param_response['ResponseMetadata'] or \
param_response['ResponseMetadata']['HTTPStatusCode'] != 200 or \
'Parameter' not in param_response or \
'Value' not in param_response['Parameter']:
return None
except ClientError:
LOGGER.exception('cannot get parameter store value for %s', param_name)
return None
return param_response['Parameter']['Value']
Empty file.
Loading

0 comments on commit 96a9434

Please sign in to comment.