Skip to content

Commit

Permalink
feat: Adding DAAC archive config (#396)
Browse files Browse the repository at this point in the history
* chore: method to subscribe to sns from sqs

* feat: adding infrastructure for archive daac

* feat: adding percolator

* feat: add method to migrate data

* feat: add migration logic to the real code

* chore: move granules index to correct location

* feat: (in progress) adding daac config crud ops

* feat: finished adding CRUDS for daac config

* fix: need to authorizer if user is authorized for current collection + set tenant & venue for DB

* fix: updating errors based on testcase

* fix: adding log statement

* fix: mistaken perc alias v. normal alias

* fix: saved search are not in correct place in mapping

* chore: adding log statement to see the problem

* fix: add it at the correct place

* fix: add test case + update errors based on those

* fix: add missing columns for daac archiving

* fix: file extension can be an array

* fix: update testcase + fix bugs

* fix: update test case
  • Loading branch information
wphyojpl authored Jul 22, 2024
1 parent 489e6da commit 7844796
Show file tree
Hide file tree
Showing 28 changed files with 964 additions and 10 deletions.
Empty file.
13 changes: 13 additions & 0 deletions cumulus_lambda_functions/daac_archiver/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator


def lambda_handler(event, context):
"""
:param event:
:param context:
:return:
{'Records': [{'messageId': '6ff7c6fd-4053-4ab4-bc12-c042be1ed275', 'receiptHandle': 'AQEBYASiFPjQT5JBI2KKCTF/uQhHfJt/tHhgucslQQdvkNVxcXCNi2E5Ux4U9N0eu7RfvlnvtycjUh0gdL7jIeoyH+VRKSF61uAJuT4p31BsNe0GYu49N9A6+kxjP/RrykR7ZofmQRdHToX1ugRc76SMRic4H/ZZ89YAHA2QeomJFMrYywIxlk8OAzYaBf2dQI7WexjY5u1CW00XNMbTGyTo4foVPxcSn6bdFpfgxW/L7yJMX/0YQvrA9ruiuQ+lrui+6fWYh5zEk3f5v1bYtUQ6DtyyfbtMHZQJTJpUlWAFRzzN+3melilH7FySyOGDXhPb0BOSzmdKq9wBbfLW/YPb7l99ejq4GfRfj8LyI4EtB96vTeUw4LCgUqbZcBrxbGBLUXMacweh+gCjHav9ylqr2SeOiqG3vWPq9pwFYQIDqNE=', 'body': '{\n "Type" : "Notification",\n "MessageId" : "33e1075a-435c-5217-a33d-59fae85e19b2",\n "TopicArn" : "arn:aws:sns:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester",\n "Subject" : "Amazon S3 Notification",\n "Message" : "{\\"Service\\":\\"Amazon S3\\",\\"Event\\":\\"s3:TestEvent\\",\\"Time\\":\\"2024-04-22T18:13:22.416Z\\",\\"Bucket\\":\\"uds-sbx-cumulus-staging\\",\\"RequestId\\":\\"DQ4T0GRVFPSX45C9\\",\\"HostId\\":\\"gHBFnYNmfnGDZBmqoQwA3RScjtjBk5lr426moGxu8IDpe5UhWAqNTxHqilWBoPN1njzIrzNrf8c=\\"}",\n "Timestamp" : "2024-04-22T18:13:22.434Z",\n "SignatureVersion" : "1",\n "Signature" : "RvSxqpU7J7CCJXbin9cXqTxzjMjgAUFtk/n454mTMcOe5x3Ay1w4AHfzyeYQCFBdLHNBa8n3OdMDoDlJqyVQMb8k+nERaiZWN2oqFVDRqT9pqSr89b+4FwlhPv6TYy2pBa/bgjZ4cOSYsey1uSQ3hjl0idfssvuV5cCRxQScbA+yu8Gcv9K7Oqgy01mC0sDHiuPIifhFXxupG5ygbjqoHIB+1gdMEbBwyixoY5GOpHM/O2uHNF+dJDjax1WMxQ2FzVjiFeCa+tNcjovF059+tx2v1YmDq/kEAFrN6DAtP6R4zKag62P9jkvjU/wHYJ2jjXmZAqoG+nuzAo24HiZPSw==",\n "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-60eadc530605d63b8e62a523676ef735.pem",\n "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester:76cbefa1-addf-45c2-97e1-ae16986b195b"\n}', 'attributes': {'ApproximateReceiveCount': '1', 'SentTimestamp': '1713809602474', 'SenderId': 'AIDAIYLAVTDLUXBIEIX46', 'ApproximateFirstReceiveTimestamp': '1713809602483'}, 'messageAttributes': {}, 'md5OfBody': 'c6d06d1b742ad5bd2cfe5f542640aad2', 'eventSource': 'aws:sqs', 'eventSourceARN': 'arn:aws:sqs:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester', 'awsRegion': 'us-west-2'}]}
"""
LambdaLoggerGenerator.remove_default_handlers()
print('To be implemented later')
return
24 changes: 24 additions & 0 deletions cumulus_lambda_functions/granules_to_es/granules_index_mapping.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,28 @@
class GranulesIndexMapping:
percolator_mappings = {
"daac_collection_name": {
"type": "keyword"
},
"daac_data_version": {
"type": "keyword"
},
"archiving_types": {
"type": "object",
"properties": {
"data_type": {"type": "keyword"},
"file_extension": {"type": "keyword"},
}
},
"daac_sns_topic_arn": {
"type": "keyword"
},
"ss_query": {
"type": "percolator"
},
"ss_username": {
"type": "keyword"
},
}
stac_mappings = {
"event_time": {"type": "long"},
"type": {"type": "keyword"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from cumulus_lambda_functions.lib.json_validator import JsonValidator

from cumulus_lambda_functions.lib.aws.aws_message_transformers import AwsMessageTransformers
from cumulus_lambda_functions.uds_api.dapa.granules_db_index import GranulesDbIndex
from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex

LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())

Expand Down
15 changes: 15 additions & 0 deletions cumulus_lambda_functions/lib/aws/aws_sns.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,18 @@ def publish_message(self, msg_str: str):
# MessageGroupId='string'
)
return response

def create_sqs_subscription(self, sqs_arn):
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/client/subscribe.html
if self.__topic_arn == '':
raise ValueError('missing topic arn to publish message')
response = self.__sns_client.subscribe(
TopicArn=self.__topic_arn,
Protocol='sqs',
Endpoint=sqs_arn, # For the sqs protocol, the endpoint is the ARN of an Amazon SQS queue.
# Attributes={
# 'string': 'string'
# },
ReturnSubscriptionArn=True # if the API request parameter ReturnSubscriptionArn is true, then the value is always the subscription ARN, even if the subscription requires confirmation.
)
return response['SubscriptionArn']
3 changes: 3 additions & 0 deletions cumulus_lambda_functions/lib/aws/es_abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

class ESAbstract(ABC):
@abstractmethod
def migrate_index_data(self, old_index, new_index):
return
@abstractmethod
def create_index(self, index_name, index_body):
return

Expand Down
28 changes: 25 additions & 3 deletions cumulus_lambda_functions/lib/aws/es_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ def __init__(self, index, base_url, port=443) -> None:
raise ValueError(f'index or base_url is None')
self.__index = index
base_url = base_url.replace('https://', '') # hide https
self._engine = Elasticsearch(hosts=[{'host': base_url, 'port': port}])
# https://elasticsearch-py.readthedocs.io/en/v7.13.4/api.html#elasticsearch.Elasticsearch
self._engine = Elasticsearch(hosts=[{'host': base_url, 'port': port, 'use_ssl': True}])

def __validate_index(self, index):
if index is not None:
Expand Down Expand Up @@ -45,6 +46,25 @@ def __check_errors_for_bulk(self, index_result):
LOGGER.exception('failed to add some items. details: {}'.format(err_list))
return err_list

def migrate_index_data(self, old_index, new_index, remove_old_data=True):
if not self.has_index(old_index) or not self.has_index(new_index):
raise ValueError(f'at least one of the indices do not exist: [{old_index}, {new_index}]')
result = self._engine.reindex(
body={
"source": {
"index": old_index,
# "query": {
# "match_all": {}
# }
},
"dest": {
"index": new_index
}
}
)
self.delete_by_query({'query': {'match_all': {}}}, old_index)
return result

def create_index(self, index_name, index_body):
result = self._engine.indices.create(index=index_name, body=index_body, include_type_name=False)
if 'acknowledged' not in result:
Expand All @@ -66,6 +86,7 @@ def swap_index_for_alias(self, alias_name, old_index_name, new_index_name):
try:
temp_result = self._engine.indices.delete_alias(index=old_index_name, name=alias_name)
except NotFoundError as ee:
LOGGER.exception(f'error while unlinking {old_index_name} from {alias_name}')
temp_result = {}
result = self.create_alias(new_index_name, alias_name)
return result
Expand All @@ -91,6 +112,7 @@ def delete_index(self, index_name):
return result['acknowledged']

def index_many(self, docs=None, doc_ids=None, doc_dict=None, index=None):
# https://elasticsearch-py.readthedocs.io/en/v7.13.4/api.html#elasticsearch.Elasticsearch.bulk
doc_dict = self.__get_doc_dict(docs, doc_ids, doc_dict)
body = []
for k, v in doc_dict.items():
Expand All @@ -115,9 +137,9 @@ def index_one(self, doc, doc_id, index=None):
body=doc, doc_type=DEFAULT_TYPE, id=doc_id)
LOGGER.info('indexed. result: {}'.format(index_result))
pass
except:
except Exception as e:
LOGGER.exception('cannot add a new index with id: {} for index: {}'.format(doc_id, index))
return None
raise e
return self

def update_many(self, docs=None, doc_ids=None, doc_dict=None, index=None):
Expand Down
95 changes: 95 additions & 0 deletions cumulus_lambda_functions/lib/uds_db/archive_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from cumulus_lambda_functions.lib.json_validator import JsonValidator

from cumulus_lambda_functions.lib.uds_db.db_constants import DBConstants

from cumulus_lambda_functions.lib.aws.es_abstract import ESAbstract

from cumulus_lambda_functions.lib.aws.es_factory import ESFactory


class UdsArchiveConfigIndex:
basic_schema = {
'type': 'object',
'required': ['daac_collection_id', 'daac_sns_topic_arn', 'daac_data_version', 'collection', 'ss_username', 'archiving_types'],
'properties': {
'daac_collection_id': {'type': 'string'},
'daac_sns_topic_arn': {'type': 'string'},
'daac_data_version': {'type': 'string'},
'collection': {'type': 'string'},
'ss_username': {'type': 'string'},
'archiving_types': {'type': 'array', 'items': {'type': 'object'}},
}
}
def __init__(self, es_url, es_port=443):
self.__es: ESAbstract = ESFactory().get_instance('AWS',
index='TODO',
base_url=es_url,
port=es_port)
self.__tenant, self.__venue = '', ''

def set_tenant_venue(self, tenant, venue):
self.__tenant, self.__venue = tenant, venue
return self

def get_config(self, collection_id, username=None):
read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{self.__tenant}_{self.__venue}_perc'.lower().strip()
conditions = [{"term": {"collection": {"value": collection_id}}}]
if username is not None:
conditions.append({"term": {"ss_username": {"value": username}}})
result = self.__es.query({
'size': 9999,
'query': {
'bool': {
'must': conditions
}
}
}, read_alias_name)
return [k['_source'] for k in result['hits']['hits']]

def add_new_config(self, ingesting_dict: dict):
result = JsonValidator(self.basic_schema).validate(ingesting_dict)
if result is not None:
raise ValueError(f'input ingesting_dict has basic_schema validation errors: {result}')
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{self.__tenant}_{self.__venue}_perc'.lower().strip()
ingesting_dict['daac_collection_name'] = ingesting_dict.pop('daac_collection_id')
ingesting_dict['ss_query'] = {
"bool": {
"must": [{
"term": {"collection": {"value": ingesting_dict['collection']} }
}]
}
}
result = self.__es.index_one(ingesting_dict, f"{ingesting_dict['daac_collection_name']}__{ingesting_dict['collection']}", index=write_alias_name)
return

def delete_config(self, collection_id, daac_collection_id):
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{self.__tenant}_{self.__venue}_perc'.lower().strip()
self.__es.delete_by_query({
'size': 9999,
'query': {
'bool': {
'must': [
{ "term": {"collection": {"value": collection_id}} },
{"term": {"daac_collection_name": {"value": daac_collection_id}}},
]
}
}
}, write_alias_name)
return

def update_config(self, collection_id, daac_collection_id, daac_sns_topic_arn, username):
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{self.__tenant}_{self.__venue}_perc'.lower().strip()
result = self.__es.update_one({
"collection": collection_id,
"daac_collection_name": daac_collection_id,
"daac_sns_topic_arn": daac_sns_topic_arn,
"ss_query": {
"bool": {
"must": [{
"term": {"collection": {"value": collection_id}}
}]
}
},
"ss_username": username,
}, f'{daac_collection_id}__{collection_id}', index=write_alias_name)
return
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(self):
# "event_time": {"type": "long"}
# }
self.__default_fields = GranulesIndexMapping.stac_mappings
self.__ss_fields = GranulesIndexMapping.percolator_mappings

@staticmethod
def to_es_bbox(bbox_array):
Expand Down Expand Up @@ -68,8 +69,13 @@ def default_fields(self, val):
self.__default_fields = val
return

def __add_custom_mappings(self, es_mapping: dict):
def __add_custom_mappings(self, es_mapping: dict, include_perc=False):
potential_ss_fields = {} if not include_perc else self.__ss_fields
customized_es_mapping = deepcopy(self.default_fields)
customized_es_mapping = {
**potential_ss_fields,
**self.default_fields,
}
customized_es_mapping['properties']['properties'] = {
**es_mapping,
**self.default_fields['properties']['properties'],
Expand Down Expand Up @@ -102,6 +108,7 @@ def create_new_index(self, tenant, tenant_venue, es_mapping: dict):
tenant = tenant.replace(':', '--')
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()

current_alias = self.__es.get_alias(write_alias_name)
# {'meta_labels_v2': {'aliases': {'metadata_labels': {}}}}
current_index_name = f'{write_alias_name}__v0' if current_alias == {} else [k for k in current_alias.keys()][0]
Expand All @@ -122,6 +129,28 @@ def create_new_index(self, tenant, tenant_venue, es_mapping: dict):
self.__es.create_index(new_index_name, index_mapping)
self.__es.create_alias(new_index_name, read_alias_name)
self.__es.swap_index_for_alias(write_alias_name, current_index_name, new_index_name)

write_perc_alias_name = f'{DBConstants.granules_write_alias_prefix}_{tenant}_{tenant_venue}_perc'.lower().strip()
read_perc_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}_perc'.lower().strip()
current_perc_alias = self.__es.get_alias(write_perc_alias_name)
current_perc_index_name = f'{write_alias_name}_perc__v0' if current_perc_alias == {} else [k for k in current_perc_alias.keys()][0]
new_perc_index_name = f'{DBConstants.granules_index_prefix}_{tenant}_{tenant_venue}_perc__v{new_version:02d}'.lower().strip()
customized_perc_es_mapping = self.__add_custom_mappings(es_mapping, True)
LOGGER.debug(f'customized_perc_es_mapping: {customized_perc_es_mapping}')
perc_index_mapping = {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 2
},
"mappings": {
"dynamic": "strict",
"properties": customized_perc_es_mapping,
}
}
self.__es.create_index(new_perc_index_name, perc_index_mapping)
self.__es.create_alias(new_perc_index_name, read_perc_alias_name)
self.__es.swap_index_for_alias(write_perc_alias_name, current_perc_index_name, new_perc_index_name)
self.__es.migrate_index_data(current_perc_index_name, new_perc_index_name)
return

def get_latest_index(self, tenant, tenant_venue):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from cumulus_lambda_functions.lib.metadata_extraction.echo_metadata import EchoMetadata
from cumulus_lambda_functions.lib.time_utils import TimeUtils
from cumulus_lambda_functions.metadata_stac_generate_cmr.stac_input_metadata import StacInputMetadata
from cumulus_lambda_functions.uds_api.dapa.granules_db_index import GranulesDbIndex
from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex

LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())

Expand Down
Empty file.
13 changes: 13 additions & 0 deletions cumulus_lambda_functions/mock_daac/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator


def lambda_handler(event, context):
"""
:param event:
:param context:
:return:
{'Records': [{'messageId': '6ff7c6fd-4053-4ab4-bc12-c042be1ed275', 'receiptHandle': 'AQEBYASiFPjQT5JBI2KKCTF/uQhHfJt/tHhgucslQQdvkNVxcXCNi2E5Ux4U9N0eu7RfvlnvtycjUh0gdL7jIeoyH+VRKSF61uAJuT4p31BsNe0GYu49N9A6+kxjP/RrykR7ZofmQRdHToX1ugRc76SMRic4H/ZZ89YAHA2QeomJFMrYywIxlk8OAzYaBf2dQI7WexjY5u1CW00XNMbTGyTo4foVPxcSn6bdFpfgxW/L7yJMX/0YQvrA9ruiuQ+lrui+6fWYh5zEk3f5v1bYtUQ6DtyyfbtMHZQJTJpUlWAFRzzN+3melilH7FySyOGDXhPb0BOSzmdKq9wBbfLW/YPb7l99ejq4GfRfj8LyI4EtB96vTeUw4LCgUqbZcBrxbGBLUXMacweh+gCjHav9ylqr2SeOiqG3vWPq9pwFYQIDqNE=', 'body': '{\n "Type" : "Notification",\n "MessageId" : "33e1075a-435c-5217-a33d-59fae85e19b2",\n "TopicArn" : "arn:aws:sns:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester",\n "Subject" : "Amazon S3 Notification",\n "Message" : "{\\"Service\\":\\"Amazon S3\\",\\"Event\\":\\"s3:TestEvent\\",\\"Time\\":\\"2024-04-22T18:13:22.416Z\\",\\"Bucket\\":\\"uds-sbx-cumulus-staging\\",\\"RequestId\\":\\"DQ4T0GRVFPSX45C9\\",\\"HostId\\":\\"gHBFnYNmfnGDZBmqoQwA3RScjtjBk5lr426moGxu8IDpe5UhWAqNTxHqilWBoPN1njzIrzNrf8c=\\"}",\n "Timestamp" : "2024-04-22T18:13:22.434Z",\n "SignatureVersion" : "1",\n "Signature" : "RvSxqpU7J7CCJXbin9cXqTxzjMjgAUFtk/n454mTMcOe5x3Ay1w4AHfzyeYQCFBdLHNBa8n3OdMDoDlJqyVQMb8k+nERaiZWN2oqFVDRqT9pqSr89b+4FwlhPv6TYy2pBa/bgjZ4cOSYsey1uSQ3hjl0idfssvuV5cCRxQScbA+yu8Gcv9K7Oqgy01mC0sDHiuPIifhFXxupG5ygbjqoHIB+1gdMEbBwyixoY5GOpHM/O2uHNF+dJDjax1WMxQ2FzVjiFeCa+tNcjovF059+tx2v1YmDq/kEAFrN6DAtP6R4zKag62P9jkvjU/wHYJ2jjXmZAqoG+nuzAo24HiZPSw==",\n "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-60eadc530605d63b8e62a523676ef735.pem",\n "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester:76cbefa1-addf-45c2-97e1-ae16986b195b"\n}', 'attributes': {'ApproximateReceiveCount': '1', 'SentTimestamp': '1713809602474', 'SenderId': 'AIDAIYLAVTDLUXBIEIX46', 'ApproximateFirstReceiveTimestamp': '1713809602483'}, 'messageAttributes': {}, 'md5OfBody': 'c6d06d1b742ad5bd2cfe5f542640aad2', 'eventSource': 'aws:sqs', 'eventSourceARN': 'arn:aws:sqs:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester', 'awsRegion': 'us-west-2'}]}
"""
LambdaLoggerGenerator.remove_default_handlers()
print('To be implemented later')
return
2 changes: 1 addition & 1 deletion cumulus_lambda_functions/uds_api/custom_meta_admin_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from cumulus_lambda_functions.cumulus_es_setup.es_setup import SetupESIndexAlias
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator
from cumulus_lambda_functions.uds_api.dapa.auth_crud import AuthCrud, AuthDeleteModel, AuthListModel, AuthAddModel
from cumulus_lambda_functions.uds_api.dapa.granules_db_index import GranulesDbIndex
from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex
from cumulus_lambda_functions.uds_api.fast_api_utils import FastApiUtils
from cumulus_lambda_functions.uds_api.web_service_constants import WebServiceConstants
from fastapi import APIRouter, HTTPException, Request, Response
Expand Down
Loading

0 comments on commit 7844796

Please sign in to comment.