Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions cumulus_lambda_functions/cumulus_es_setup/es_setup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import os

import requests

from cumulus_lambda_functions.granules_to_es.granules_index_mapping import GranulesIndexMapping
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator

from mdps_ds_lib.lib.aws.es_abstract import ESAbstract
Expand All @@ -23,6 +26,36 @@ def __init__(self):
port=int(os.getenv('ES_PORT', '443'))
)

def setup_maap_daac_index(self):
stac_fast_version = '6.0.0'
url = f"https://raw.githubusercontent.com/stac-utils/stac-fastapi-elasticsearch-opensearch/refs/tags/v{stac_fast_version}/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/mappings.py"
resp = requests.get(url)
resp.raise_for_status()

code = resp.text
namespace = {}
exec(code, namespace)
es_items_mappings = namespace["ES_ITEMS_MAPPINGS"]
LOGGER.debug(f'stac fast API es_items_mappings: {es_items_mappings}')
es_items_mappings['properties'] = {
**GranulesIndexMapping.percolator_mappings,
**es_items_mappings['properties'],
}
index_mapping = {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 2
},
"mappings": es_items_mappings
}
index_name = f'{GranulesIndexMapping.daac_percolator_name}--{stac_fast_version.replace(".", "-")}'
try:
self.__es.create_index(index_name, index_mapping)
self.__es.create_alias(index_name, GranulesIndexMapping.daac_percolator_name)
except:
LOGGER.exception(f'failed to create index / alias for: {GranulesIndexMapping.daac_percolator_name}')
return self

def get_index_mapping(self, index_name: str):
if not hasattr(es_mappings, index_name):
raise ValueError(f'missing index_name: {index_name}')
Expand Down
81 changes: 81 additions & 0 deletions cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
from time import sleep

import requests
from mdps_ds_lib.lib.cumulus_stac.item_transformer import ItemTransformer
from mdps_ds_lib.lib.utils.file_utils import FileUtils

from mdps_ds_lib.lib.aws.aws_s3 import AwsS3

from mdps_ds_lib.lib.aws.aws_message_transformers import AwsMessageTransformers
from mdps_ds_lib.lib.utils.json_validator import JsonValidator
from pystac import Item
from mdps_ds_lib.stac_fast_api_client.sfa_client_factory import SFAClientFactory

from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex
Expand Down Expand Up @@ -148,6 +150,85 @@ def send_to_daac(self, event: dict):
self.send_to_daac_internal(uds_cnm_json)
return

def __extract_files_maap(self, asset_dict, daac_config):
result_files = []
# https://github.com/podaac/cloud-notification-message-schema
for k, v in asset_dict.items():
# if v.roles[0]['type'] not in archiving_types:
# continue
temp = {
'type': v.roles[0],
'uri': self.revert_to_s3_url(v.href),
'name': os.path.basename(v.href),
'checksumType': 'md5',
'checksum': v.extra_fields['file:checksum'],
'size': v.extra_fields['file:size']
}
result_files.append(temp) # TODO remove missing md5?

if 'archiving_types' not in daac_config or len(daac_config['archiving_types']) < 1:
return result_files # TODO remove missing md5?
archiving_types = {k['data_type']: [] if 'file_extension' not in k else k['file_extension'] for k in daac_config['archiving_types']}
result_files1 = []
for each_file in result_files:
if each_file['type'] not in archiving_types:
continue
file_extensions = archiving_types[each_file['type']]
if len(file_extensions) < 1:
result_files1.append(each_file) # TODO remove missing md5?
continue
temp_filename = each_file['name'].upper().strip()
if any([temp_filename.endswith(k.upper()) for k in file_extensions]):
result_files1.append(each_file) # TODO remove missing md5?
return result_files1

def send_to_daac_maap(self, granules_json):
daac_configs = self.__archive_index_logic.percolate_maap_document(granules_json)
if daac_configs is None or len(daac_configs) < 1:
LOGGER.debug(f'this granule is not configured for archival: {granules_json}')
return
granules_item = Item.from_dict(granules_json)
errors = []
for each_daac_config in daac_configs:
LOGGER.debug(f'working on {each_daac_config}')
result = JsonValidator(UdsArchiveConfigIndex.db_record_schema).validate(each_daac_config)
if result is not None:
errors.append(f'each_daac_config does not have valid schema. Pls re-add the daac config: {result} for {each_daac_config}')
continue
try:
self.__sns.set_topic_arn(each_daac_config['daac_sns_topic_arn'])
daac_cnm_message = {
"collection": {
'name': each_daac_config['daac_collection_name'],
'version': each_daac_config['daac_data_version'],
},
"identifier": granules_item.id,
"submissionTime": f'{TimeUtils.get_current_time()}Z',
"provider": each_daac_config['daac_provider'],
"version": "1.6.0", # TODO this is hardcoded?
"product": {
"name": granules_item.id,
# "dataVersion": daac_config['daac_data_version'],
'files': self.__extract_files_maap(granules_item.assets, each_daac_config),
}
}
LOGGER.debug(f'daac_cnm_message: {daac_cnm_message}')
self.__sns.set_external_role(each_daac_config['daac_role_arn'],
each_daac_config['daac_role_session_name']).publish_message(
json.dumps(daac_cnm_message), True)
return {
'archive_status': 'cnm_s_success',
'archive_error_message': '',
'archive_error_code': '',
}
except Exception as e:
LOGGER.exception(f'failed during archival process')
return {
'archive_status': 'cnm_s_failed',
'archive_error_message': str(e),
}
return

def update_stac(self, cnm_notification_msg):
update_type = os.getenv('ARCHIVAL_STATUS_MECHANISM', '')
if not any([k for k in ['UDS', 'FAST_STAC'] if k == update_type]):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
class GranulesIndexMapping:
daac_percolator_name = 'uds_maap_percolator'
archiving_keys = [
'archive_status', 'archive_error_message', 'archive_error_code'
]
Expand Down
24 changes: 24 additions & 0 deletions cumulus_lambda_functions/lib/uds_db/archive_index.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from copy import deepcopy

from cumulus_lambda_functions.granules_to_es.granules_index_mapping import GranulesIndexMapping
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator

from mdps_ds_lib.lib.utils.json_validator import JsonValidator
Expand Down Expand Up @@ -56,6 +57,29 @@ def __init__(self, es_url, es_port=443, es_type='AWS', use_ssl=True):
port=es_port)
self.__tenant, self.__venue = '', ''

def percolate_maap_document(self, document):
dsl = {
'size': 9999,
# '_source': ['ss_name', 'ss_type', 'ss_username'],
'query': {
'percolate': {
'field': 'ss_query',
'document': document,
}
},
# 'sort': [{'ss_name': {'order': 'asc'}}]
}
try:
percolated_result = self.__es.query(dsl, querying_index=GranulesIndexMapping.daac_percolator_name)
except Exception as e:
if e.error == 'resource_not_found_exception':
LOGGER.debug(f'unable to find document: {document} on index: {GranulesIndexMapping.daac_percolator_name}')
return None
LOGGER.exception(f'error while percolating')
raise e
percolated_result = [k['_source'] for k in percolated_result['hits']['hits']]
return percolated_result

def percolate_document(self, document_id):
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{self.__tenant}_{self.__venue}'.lower().strip()
current_alias = self.__es.get_alias(write_alias_name)
Expand Down
26 changes: 26 additions & 0 deletions cumulus_lambda_functions/uds_api/granules_archive_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os

from cumulus_lambda_functions.daac_archiver.daac_archiver_logic import DaacArchiverLogic
from cumulus_lambda_functions.uds_api.dapa.granules_dapa_query_es import GranulesDapaQueryEs
from cumulus_lambda_functions.uds_api.dapa.pagination_links_generator import PaginationLinksGenerator

Expand All @@ -21,6 +22,7 @@

from fastapi import APIRouter, HTTPException, Request

from cumulus_lambda_functions.uds_api.granules_api import StacGranuleModel
from cumulus_lambda_functions.uds_api.web_service_constants import WebServiceConstants

LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())
Expand Down Expand Up @@ -143,6 +145,30 @@ async def dapa_archive_get_config(request: Request, collection_id: str):
return add_result['body']
raise HTTPException(status_code=add_result['statusCode'], detail=add_result['body'])

@router.post("/{collection_id}/archive/{granule_id}")
@router.post("/{collection_id}/archive/{granule_id}/")
async def archive_single_granule_dapa(request: Request, collection_id: str, granule_id: str, granule: StacGranuleModel):
authorizer: UDSAuthorizorAbstract = UDSAuthorizerFactory() \
.get_instance(UDSAuthorizerFactory.cognito,
es_url=os.getenv('ES_URL'),
es_port=int(os.getenv('ES_PORT', '443'))
)
auth_info = FastApiUtils.get_authorization_info(request)
collection_identifier = UdsCollections.decode_identifier(collection_id)
if not authorizer.is_authorized_for_collection(DBConstants.read, collection_id,
auth_info['ldap_groups'],
collection_identifier.tenant,
collection_identifier.venue):
LOGGER.debug(f'user: {auth_info["username"]} is not authorized for {collection_id}')
raise HTTPException(status_code=403, detail=json.dumps({
'message': 'not authorized to execute this action'
}))
new_granule = granule.model_dump()
update_result = DaacArchiverLogic().send_to_daac_maap(new_granule)

return


@router.put("/{collection_id}/archive/{granule_id}")
@router.put("/{collection_id}/archive/{granule_id}/")
async def archive_single_granule_dapa(request: Request, collection_id: str, granule_id: str):
Expand Down
22 changes: 22 additions & 0 deletions cumulus_lambda_functions/uds_api/system_admin_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,25 @@ async def es_setup(request: Request, tenant: Union[str, None]=None, venue: Union
LOGGER.exception(f'')
raise HTTPException(status_code=500, detail=str(e))
return {'message': 'successful'}


@router.put("/maap_daac_config_setup")
@router.put("/maap_daac_config_setup/")
async def maap_daac_config_setup(request: Request, tenant: Union[str, None]=None, venue: Union[str, None]=None, group_names: Union[str, None]=None):
LOGGER.debug(f'started maap_daac_config_setup')
auth_info = FastApiUtils.get_authorization_info(request)
query_body = {
'tenant': tenant,
'venue': venue,
'ldap_group_names': group_names if group_names is None else [k.strip() for k in group_names.split(',')],
}
auth_crud = AuthCrud(auth_info, query_body)
is_admin_result = auth_crud.is_admin()
if is_admin_result['statusCode'] != 200:
raise HTTPException(status_code=is_admin_result['statusCode'], detail=is_admin_result['body'])
try:
SetupESIndexAlias().setup_maap_daac_index()
except Exception as e:
LOGGER.exception(f'')
raise HTTPException(status_code=500, detail=str(e))
return {'message': 'successful'}
37 changes: 37 additions & 0 deletions cumulus_lambda_functions/uds_api/web_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import httpx
from fastapi import Response

from fastapi.staticfiles import StaticFiles

from cumulus_lambda_functions.uds_api.fast_api_utils import FastApiUtils
Expand Down Expand Up @@ -64,6 +67,40 @@ async def get_open_api(request: Request):
default_open_api_doc['paths'].pop(k)
return app.openapi()


# NOTE: This is how you create a proxy in Fast API.

# BACKEND_URL = 'http://localhost:8080/' # TODO make sure it ends with '/'
# @app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
# async def proxy(full_path: str, request: Request):
# # Construct full target URL
# fast_api_path = full_path.replace(f'{api_base_prefix}/', '')
# target_url = f"{BACKEND_URL}{fast_api_path}"
# print(f'full_path = {full_path}')
# print(f'target_url = {target_url}')
# # Prepare the request
# method = request.method
# headers = dict(request.headers)
# body = await request.body()
#
# async with httpx.AsyncClient() as client:
# backend_response = await client.request(
# method,
# target_url,
# content=body,
# headers=headers,
# params=request.query_params
# )
#
# # Return the response from the backend
# return Response(
# content=backend_response.content,
# status_code=backend_response.status_code,
# headers=dict(backend_response.headers),
# )
#


# to make it work with Amazon Lambda, we create a handler object
handler = Mangum(app=app)

Expand Down
Loading