Skip to content

feat: Adding DAAC archive config #396

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
13 changes: 13 additions & 0 deletions cumulus_lambda_functions/daac_archiver/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator


def lambda_handler(event, context):
"""
:param event:
:param context:
:return:
{'Records': [{'messageId': '6ff7c6fd-4053-4ab4-bc12-c042be1ed275', 'receiptHandle': 'AQEBYASiFPjQT5JBI2KKCTF/uQhHfJt/tHhgucslQQdvkNVxcXCNi2E5Ux4U9N0eu7RfvlnvtycjUh0gdL7jIeoyH+VRKSF61uAJuT4p31BsNe0GYu49N9A6+kxjP/RrykR7ZofmQRdHToX1ugRc76SMRic4H/ZZ89YAHA2QeomJFMrYywIxlk8OAzYaBf2dQI7WexjY5u1CW00XNMbTGyTo4foVPxcSn6bdFpfgxW/L7yJMX/0YQvrA9ruiuQ+lrui+6fWYh5zEk3f5v1bYtUQ6DtyyfbtMHZQJTJpUlWAFRzzN+3melilH7FySyOGDXhPb0BOSzmdKq9wBbfLW/YPb7l99ejq4GfRfj8LyI4EtB96vTeUw4LCgUqbZcBrxbGBLUXMacweh+gCjHav9ylqr2SeOiqG3vWPq9pwFYQIDqNE=', 'body': '{\n "Type" : "Notification",\n "MessageId" : "33e1075a-435c-5217-a33d-59fae85e19b2",\n "TopicArn" : "arn:aws:sns:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester",\n "Subject" : "Amazon S3 Notification",\n "Message" : "{\\"Service\\":\\"Amazon S3\\",\\"Event\\":\\"s3:TestEvent\\",\\"Time\\":\\"2024-04-22T18:13:22.416Z\\",\\"Bucket\\":\\"uds-sbx-cumulus-staging\\",\\"RequestId\\":\\"DQ4T0GRVFPSX45C9\\",\\"HostId\\":\\"gHBFnYNmfnGDZBmqoQwA3RScjtjBk5lr426moGxu8IDpe5UhWAqNTxHqilWBoPN1njzIrzNrf8c=\\"}",\n "Timestamp" : "2024-04-22T18:13:22.434Z",\n "SignatureVersion" : "1",\n "Signature" : "RvSxqpU7J7CCJXbin9cXqTxzjMjgAUFtk/n454mTMcOe5x3Ay1w4AHfzyeYQCFBdLHNBa8n3OdMDoDlJqyVQMb8k+nERaiZWN2oqFVDRqT9pqSr89b+4FwlhPv6TYy2pBa/bgjZ4cOSYsey1uSQ3hjl0idfssvuV5cCRxQScbA+yu8Gcv9K7Oqgy01mC0sDHiuPIifhFXxupG5ygbjqoHIB+1gdMEbBwyixoY5GOpHM/O2uHNF+dJDjax1WMxQ2FzVjiFeCa+tNcjovF059+tx2v1YmDq/kEAFrN6DAtP6R4zKag62P9jkvjU/wHYJ2jjXmZAqoG+nuzAo24HiZPSw==",\n "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-60eadc530605d63b8e62a523676ef735.pem",\n "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester:76cbefa1-addf-45c2-97e1-ae16986b195b"\n}', 'attributes': {'ApproximateReceiveCount': '1', 'SentTimestamp': '1713809602474', 'SenderId': 'AIDAIYLAVTDLUXBIEIX46', 'ApproximateFirstReceiveTimestamp': '1713809602483'}, 'messageAttributes': {}, 'md5OfBody': 'c6d06d1b742ad5bd2cfe5f542640aad2', 'eventSource': 'aws:sqs', 'eventSourceARN': 'arn:aws:sqs:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester', 'awsRegion': 'us-west-2'}]}
"""
LambdaLoggerGenerator.remove_default_handlers()
print('To be implemented later')
return
24 changes: 24 additions & 0 deletions cumulus_lambda_functions/granules_to_es/granules_index_mapping.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,28 @@
class GranulesIndexMapping:
percolator_mappings = {
"daac_collection_name": {
"type": "keyword"
},
"daac_data_version": {
"type": "keyword"
},
"archiving_types": {
"type": "object",
"properties": {
"data_type": {"type": "keyword"},
"file_extension": {"type": "keyword"},
}
},
"daac_sns_topic_arn": {
"type": "keyword"
},
"ss_query": {
"type": "percolator"
},
"ss_username": {
"type": "keyword"
},
}
stac_mappings = {
"event_time": {"type": "long"},
"type": {"type": "keyword"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from cumulus_lambda_functions.lib.json_validator import JsonValidator

from cumulus_lambda_functions.lib.aws.aws_message_transformers import AwsMessageTransformers
from cumulus_lambda_functions.uds_api.dapa.granules_db_index import GranulesDbIndex
from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex

LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())

Expand Down
15 changes: 15 additions & 0 deletions cumulus_lambda_functions/lib/aws/aws_sns.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,18 @@ def publish_message(self, msg_str: str):
# MessageGroupId='string'
)
return response

def create_sqs_subscription(self, sqs_arn):
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/client/subscribe.html
if self.__topic_arn == '':
raise ValueError('missing topic arn to publish message')
response = self.__sns_client.subscribe(
TopicArn=self.__topic_arn,
Protocol='sqs',
Endpoint=sqs_arn, # For the sqs protocol, the endpoint is the ARN of an Amazon SQS queue.
# Attributes={
# 'string': 'string'
# },
ReturnSubscriptionArn=True # if the API request parameter ReturnSubscriptionArn is true, then the value is always the subscription ARN, even if the subscription requires confirmation.
)
return response['SubscriptionArn']
3 changes: 3 additions & 0 deletions cumulus_lambda_functions/lib/aws/es_abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

class ESAbstract(ABC):
@abstractmethod
def migrate_index_data(self, old_index, new_index):
return
@abstractmethod
def create_index(self, index_name, index_body):
return

Expand Down
28 changes: 25 additions & 3 deletions cumulus_lambda_functions/lib/aws/es_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ def __init__(self, index, base_url, port=443) -> None:
raise ValueError(f'index or base_url is None')
self.__index = index
base_url = base_url.replace('https://', '') # hide https
self._engine = Elasticsearch(hosts=[{'host': base_url, 'port': port}])
# https://elasticsearch-py.readthedocs.io/en/v7.13.4/api.html#elasticsearch.Elasticsearch
self._engine = Elasticsearch(hosts=[{'host': base_url, 'port': port, 'use_ssl': True}])

def __validate_index(self, index):
if index is not None:
Expand Down Expand Up @@ -45,6 +46,25 @@ def __check_errors_for_bulk(self, index_result):
LOGGER.exception('failed to add some items. details: {}'.format(err_list))
return err_list

def migrate_index_data(self, old_index, new_index, remove_old_data=True):
if not self.has_index(old_index) or not self.has_index(new_index):
raise ValueError(f'at least one of the indices do not exist: [{old_index}, {new_index}]')
result = self._engine.reindex(
body={
"source": {
"index": old_index,
# "query": {
# "match_all": {}
# }
},
"dest": {
"index": new_index
}
}
)
self.delete_by_query({'query': {'match_all': {}}}, old_index)
return result

def create_index(self, index_name, index_body):
result = self._engine.indices.create(index=index_name, body=index_body, include_type_name=False)
if 'acknowledged' not in result:
Expand All @@ -66,6 +86,7 @@ def swap_index_for_alias(self, alias_name, old_index_name, new_index_name):
try:
temp_result = self._engine.indices.delete_alias(index=old_index_name, name=alias_name)
except NotFoundError as ee:
LOGGER.exception(f'error while unlinking {old_index_name} from {alias_name}')
temp_result = {}
result = self.create_alias(new_index_name, alias_name)
return result
Expand All @@ -91,6 +112,7 @@ def delete_index(self, index_name):
return result['acknowledged']

def index_many(self, docs=None, doc_ids=None, doc_dict=None, index=None):
# https://elasticsearch-py.readthedocs.io/en/v7.13.4/api.html#elasticsearch.Elasticsearch.bulk
doc_dict = self.__get_doc_dict(docs, doc_ids, doc_dict)
body = []
for k, v in doc_dict.items():
Expand All @@ -115,9 +137,9 @@ def index_one(self, doc, doc_id, index=None):
body=doc, doc_type=DEFAULT_TYPE, id=doc_id)
LOGGER.info('indexed. result: {}'.format(index_result))
pass
except:
except Exception as e:
LOGGER.exception('cannot add a new index with id: {} for index: {}'.format(doc_id, index))
return None
raise e
return self

def update_many(self, docs=None, doc_ids=None, doc_dict=None, index=None):
Expand Down
95 changes: 95 additions & 0 deletions cumulus_lambda_functions/lib/uds_db/archive_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from cumulus_lambda_functions.lib.json_validator import JsonValidator

from cumulus_lambda_functions.lib.uds_db.db_constants import DBConstants

from cumulus_lambda_functions.lib.aws.es_abstract import ESAbstract

from cumulus_lambda_functions.lib.aws.es_factory import ESFactory


class UdsArchiveConfigIndex:
basic_schema = {
'type': 'object',
'required': ['daac_collection_id', 'daac_sns_topic_arn', 'daac_data_version', 'collection', 'ss_username', 'archiving_types'],
'properties': {
'daac_collection_id': {'type': 'string'},
'daac_sns_topic_arn': {'type': 'string'},
'daac_data_version': {'type': 'string'},
'collection': {'type': 'string'},
'ss_username': {'type': 'string'},
'archiving_types': {'type': 'array', 'items': {'type': 'object'}},
}
}
def __init__(self, es_url, es_port=443):
self.__es: ESAbstract = ESFactory().get_instance('AWS',
index='TODO',
base_url=es_url,
port=es_port)
self.__tenant, self.__venue = '', ''

def set_tenant_venue(self, tenant, venue):
self.__tenant, self.__venue = tenant, venue
return self

def get_config(self, collection_id, username=None):
read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{self.__tenant}_{self.__venue}_perc'.lower().strip()
conditions = [{"term": {"collection": {"value": collection_id}}}]
if username is not None:
conditions.append({"term": {"ss_username": {"value": username}}})
result = self.__es.query({
'size': 9999,
'query': {
'bool': {
'must': conditions
}
}
}, read_alias_name)
return [k['_source'] for k in result['hits']['hits']]

def add_new_config(self, ingesting_dict: dict):
result = JsonValidator(self.basic_schema).validate(ingesting_dict)
if result is not None:
raise ValueError(f'input ingesting_dict has basic_schema validation errors: {result}')
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{self.__tenant}_{self.__venue}_perc'.lower().strip()
ingesting_dict['daac_collection_name'] = ingesting_dict.pop('daac_collection_id')
ingesting_dict['ss_query'] = {
"bool": {
"must": [{
"term": {"collection": {"value": ingesting_dict['collection']} }
}]
}
}
result = self.__es.index_one(ingesting_dict, f"{ingesting_dict['daac_collection_name']}__{ingesting_dict['collection']}", index=write_alias_name)
return

def delete_config(self, collection_id, daac_collection_id):
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{self.__tenant}_{self.__venue}_perc'.lower().strip()
self.__es.delete_by_query({
'size': 9999,
'query': {
'bool': {
'must': [
{ "term": {"collection": {"value": collection_id}} },
{"term": {"daac_collection_name": {"value": daac_collection_id}}},
]
}
}
}, write_alias_name)
return

def update_config(self, collection_id, daac_collection_id, daac_sns_topic_arn, username):
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{self.__tenant}_{self.__venue}_perc'.lower().strip()
result = self.__es.update_one({
"collection": collection_id,
"daac_collection_name": daac_collection_id,
"daac_sns_topic_arn": daac_sns_topic_arn,
"ss_query": {
"bool": {
"must": [{
"term": {"collection": {"value": collection_id}}
}]
}
},
"ss_username": username,
}, f'{daac_collection_id}__{collection_id}', index=write_alias_name)
return
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(self):
# "event_time": {"type": "long"}
# }
self.__default_fields = GranulesIndexMapping.stac_mappings
self.__ss_fields = GranulesIndexMapping.percolator_mappings

@staticmethod
def to_es_bbox(bbox_array):
Expand Down Expand Up @@ -68,8 +69,13 @@ def default_fields(self, val):
self.__default_fields = val
return

def __add_custom_mappings(self, es_mapping: dict):
def __add_custom_mappings(self, es_mapping: dict, include_perc=False):
potential_ss_fields = {} if not include_perc else self.__ss_fields
customized_es_mapping = deepcopy(self.default_fields)
customized_es_mapping = {
**potential_ss_fields,
**self.default_fields,
}
customized_es_mapping['properties']['properties'] = {
**es_mapping,
**self.default_fields['properties']['properties'],
Expand Down Expand Up @@ -102,6 +108,7 @@ def create_new_index(self, tenant, tenant_venue, es_mapping: dict):
tenant = tenant.replace(':', '--')
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()

current_alias = self.__es.get_alias(write_alias_name)
# {'meta_labels_v2': {'aliases': {'metadata_labels': {}}}}
current_index_name = f'{write_alias_name}__v0' if current_alias == {} else [k for k in current_alias.keys()][0]
Expand All @@ -122,6 +129,28 @@ def create_new_index(self, tenant, tenant_venue, es_mapping: dict):
self.__es.create_index(new_index_name, index_mapping)
self.__es.create_alias(new_index_name, read_alias_name)
self.__es.swap_index_for_alias(write_alias_name, current_index_name, new_index_name)

write_perc_alias_name = f'{DBConstants.granules_write_alias_prefix}_{tenant}_{tenant_venue}_perc'.lower().strip()
read_perc_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}_perc'.lower().strip()
current_perc_alias = self.__es.get_alias(write_perc_alias_name)
current_perc_index_name = f'{write_alias_name}_perc__v0' if current_perc_alias == {} else [k for k in current_perc_alias.keys()][0]
new_perc_index_name = f'{DBConstants.granules_index_prefix}_{tenant}_{tenant_venue}_perc__v{new_version:02d}'.lower().strip()
customized_perc_es_mapping = self.__add_custom_mappings(es_mapping, True)
LOGGER.debug(f'customized_perc_es_mapping: {customized_perc_es_mapping}')
perc_index_mapping = {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 2
},
"mappings": {
"dynamic": "strict",
"properties": customized_perc_es_mapping,
}
}
self.__es.create_index(new_perc_index_name, perc_index_mapping)
self.__es.create_alias(new_perc_index_name, read_perc_alias_name)
self.__es.swap_index_for_alias(write_perc_alias_name, current_perc_index_name, new_perc_index_name)
self.__es.migrate_index_data(current_perc_index_name, new_perc_index_name)
return

def get_latest_index(self, tenant, tenant_venue):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from cumulus_lambda_functions.lib.metadata_extraction.echo_metadata import EchoMetadata
from cumulus_lambda_functions.lib.time_utils import TimeUtils
from cumulus_lambda_functions.metadata_stac_generate_cmr.stac_input_metadata import StacInputMetadata
from cumulus_lambda_functions.uds_api.dapa.granules_db_index import GranulesDbIndex
from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex

LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())

Expand Down
Empty file.
13 changes: 13 additions & 0 deletions cumulus_lambda_functions/mock_daac/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator


def lambda_handler(event, context):
"""
:param event:
:param context:
:return:
{'Records': [{'messageId': '6ff7c6fd-4053-4ab4-bc12-c042be1ed275', 'receiptHandle': 'AQEBYASiFPjQT5JBI2KKCTF/uQhHfJt/tHhgucslQQdvkNVxcXCNi2E5Ux4U9N0eu7RfvlnvtycjUh0gdL7jIeoyH+VRKSF61uAJuT4p31BsNe0GYu49N9A6+kxjP/RrykR7ZofmQRdHToX1ugRc76SMRic4H/ZZ89YAHA2QeomJFMrYywIxlk8OAzYaBf2dQI7WexjY5u1CW00XNMbTGyTo4foVPxcSn6bdFpfgxW/L7yJMX/0YQvrA9ruiuQ+lrui+6fWYh5zEk3f5v1bYtUQ6DtyyfbtMHZQJTJpUlWAFRzzN+3melilH7FySyOGDXhPb0BOSzmdKq9wBbfLW/YPb7l99ejq4GfRfj8LyI4EtB96vTeUw4LCgUqbZcBrxbGBLUXMacweh+gCjHav9ylqr2SeOiqG3vWPq9pwFYQIDqNE=', 'body': '{\n "Type" : "Notification",\n "MessageId" : "33e1075a-435c-5217-a33d-59fae85e19b2",\n "TopicArn" : "arn:aws:sns:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester",\n "Subject" : "Amazon S3 Notification",\n "Message" : "{\\"Service\\":\\"Amazon S3\\",\\"Event\\":\\"s3:TestEvent\\",\\"Time\\":\\"2024-04-22T18:13:22.416Z\\",\\"Bucket\\":\\"uds-sbx-cumulus-staging\\",\\"RequestId\\":\\"DQ4T0GRVFPSX45C9\\",\\"HostId\\":\\"gHBFnYNmfnGDZBmqoQwA3RScjtjBk5lr426moGxu8IDpe5UhWAqNTxHqilWBoPN1njzIrzNrf8c=\\"}",\n "Timestamp" : "2024-04-22T18:13:22.434Z",\n "SignatureVersion" : "1",\n "Signature" : "RvSxqpU7J7CCJXbin9cXqTxzjMjgAUFtk/n454mTMcOe5x3Ay1w4AHfzyeYQCFBdLHNBa8n3OdMDoDlJqyVQMb8k+nERaiZWN2oqFVDRqT9pqSr89b+4FwlhPv6TYy2pBa/bgjZ4cOSYsey1uSQ3hjl0idfssvuV5cCRxQScbA+yu8Gcv9K7Oqgy01mC0sDHiuPIifhFXxupG5ygbjqoHIB+1gdMEbBwyixoY5GOpHM/O2uHNF+dJDjax1WMxQ2FzVjiFeCa+tNcjovF059+tx2v1YmDq/kEAFrN6DAtP6R4zKag62P9jkvjU/wHYJ2jjXmZAqoG+nuzAo24HiZPSw==",\n "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-60eadc530605d63b8e62a523676ef735.pem",\n "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester:76cbefa1-addf-45c2-97e1-ae16986b195b"\n}', 'attributes': {'ApproximateReceiveCount': '1', 'SentTimestamp': '1713809602474', 'SenderId': 'AIDAIYLAVTDLUXBIEIX46', 'ApproximateFirstReceiveTimestamp': '1713809602483'}, 'messageAttributes': {}, 'md5OfBody': 'c6d06d1b742ad5bd2cfe5f542640aad2', 'eventSource': 'aws:sqs', 'eventSourceARN': 'arn:aws:sqs:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester', 'awsRegion': 'us-west-2'}]}
"""
LambdaLoggerGenerator.remove_default_handlers()
print('To be implemented later')
return
2 changes: 1 addition & 1 deletion cumulus_lambda_functions/uds_api/custom_meta_admin_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from cumulus_lambda_functions.cumulus_es_setup.es_setup import SetupESIndexAlias
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator
from cumulus_lambda_functions.uds_api.dapa.auth_crud import AuthCrud, AuthDeleteModel, AuthListModel, AuthAddModel
from cumulus_lambda_functions.uds_api.dapa.granules_db_index import GranulesDbIndex
from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex
from cumulus_lambda_functions.uds_api.fast_api_utils import FastApiUtils
from cumulus_lambda_functions.uds_api.web_service_constants import WebServiceConstants
from fastapi import APIRouter, HTTPException, Request, Response
Expand Down
Loading
Loading