Skip to content

release/7.12.2 #409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [7.12.2] - 2024-08-06
### Fixed
- [#407](https://github.com/unity-sds/unity-data-services/pull/407) fix: download from http with stream enabled

## [7.12.1] - 2024-07-23
### Fixed
- [#399](https://github.com/unity-sds/unity-data-services/pull/399) fix: replace health check ssm

## [7.12.0] - 2024-07-23
### Changed
- [#398](https://github.com/unity-sds/unity-data-services/pull/398) feat: add mock daac lambda logic

## [7.11.0] - 2024-07-22
### Changed
- [#396](https://github.com/unity-sds/unity-data-services/pull/396) feat: adding daac archive config

## [7.10.1] - 2024-07-10
### Fixed
- [#393](https://github.com/unity-sds/unity-data-services/pull/393) fix: less than 200 is ok. not error
Expand Down
Empty file.
13 changes: 13 additions & 0 deletions cumulus_lambda_functions/daac_archiver/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator


def lambda_handler(event, context):
"""
:param event:
:param context:
:return:
{'Records': [{'messageId': '6ff7c6fd-4053-4ab4-bc12-c042be1ed275', 'receiptHandle': 'AQEBYASiFPjQT5JBI2KKCTF/uQhHfJt/tHhgucslQQdvkNVxcXCNi2E5Ux4U9N0eu7RfvlnvtycjUh0gdL7jIeoyH+VRKSF61uAJuT4p31BsNe0GYu49N9A6+kxjP/RrykR7ZofmQRdHToX1ugRc76SMRic4H/ZZ89YAHA2QeomJFMrYywIxlk8OAzYaBf2dQI7WexjY5u1CW00XNMbTGyTo4foVPxcSn6bdFpfgxW/L7yJMX/0YQvrA9ruiuQ+lrui+6fWYh5zEk3f5v1bYtUQ6DtyyfbtMHZQJTJpUlWAFRzzN+3melilH7FySyOGDXhPb0BOSzmdKq9wBbfLW/YPb7l99ejq4GfRfj8LyI4EtB96vTeUw4LCgUqbZcBrxbGBLUXMacweh+gCjHav9ylqr2SeOiqG3vWPq9pwFYQIDqNE=', 'body': '{\n "Type" : "Notification",\n "MessageId" : "33e1075a-435c-5217-a33d-59fae85e19b2",\n "TopicArn" : "arn:aws:sns:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester",\n "Subject" : "Amazon S3 Notification",\n "Message" : "{\\"Service\\":\\"Amazon S3\\",\\"Event\\":\\"s3:TestEvent\\",\\"Time\\":\\"2024-04-22T18:13:22.416Z\\",\\"Bucket\\":\\"uds-sbx-cumulus-staging\\",\\"RequestId\\":\\"DQ4T0GRVFPSX45C9\\",\\"HostId\\":\\"gHBFnYNmfnGDZBmqoQwA3RScjtjBk5lr426moGxu8IDpe5UhWAqNTxHqilWBoPN1njzIrzNrf8c=\\"}",\n "Timestamp" : "2024-04-22T18:13:22.434Z",\n "SignatureVersion" : "1",\n "Signature" : "RvSxqpU7J7CCJXbin9cXqTxzjMjgAUFtk/n454mTMcOe5x3Ay1w4AHfzyeYQCFBdLHNBa8n3OdMDoDlJqyVQMb8k+nERaiZWN2oqFVDRqT9pqSr89b+4FwlhPv6TYy2pBa/bgjZ4cOSYsey1uSQ3hjl0idfssvuV5cCRxQScbA+yu8Gcv9K7Oqgy01mC0sDHiuPIifhFXxupG5ygbjqoHIB+1gdMEbBwyixoY5GOpHM/O2uHNF+dJDjax1WMxQ2FzVjiFeCa+tNcjovF059+tx2v1YmDq/kEAFrN6DAtP6R4zKag62P9jkvjU/wHYJ2jjXmZAqoG+nuzAo24HiZPSw==",\n "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-60eadc530605d63b8e62a523676ef735.pem",\n "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester:76cbefa1-addf-45c2-97e1-ae16986b195b"\n}', 'attributes': {'ApproximateReceiveCount': '1', 'SentTimestamp': '1713809602474', 'SenderId': 'AIDAIYLAVTDLUXBIEIX46', 'ApproximateFirstReceiveTimestamp': '1713809602483'}, 'messageAttributes': {}, 'md5OfBody': 'c6d06d1b742ad5bd2cfe5f542640aad2', 'eventSource': 'aws:sqs', 'eventSourceARN': 'arn:aws:sqs:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester', 'awsRegion': 'us-west-2'}]}
"""
LambdaLoggerGenerator.remove_default_handlers()
print('To be implemented later')
return
24 changes: 24 additions & 0 deletions cumulus_lambda_functions/granules_to_es/granules_index_mapping.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,28 @@
class GranulesIndexMapping:
percolator_mappings = {
"daac_collection_name": {
"type": "keyword"
},
"daac_data_version": {
"type": "keyword"
},
"archiving_types": {
"type": "object",
"properties": {
"data_type": {"type": "keyword"},
"file_extension": {"type": "keyword"},
}
},
"daac_sns_topic_arn": {
"type": "keyword"
},
"ss_query": {
"type": "percolator"
},
"ss_username": {
"type": "keyword"
},
}
stac_mappings = {
"event_time": {"type": "long"},
"type": {"type": "keyword"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from cumulus_lambda_functions.lib.json_validator import JsonValidator

from cumulus_lambda_functions.lib.aws.aws_message_transformers import AwsMessageTransformers
from cumulus_lambda_functions.uds_api.dapa.granules_db_index import GranulesDbIndex
from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex

LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())

Expand Down
32 changes: 31 additions & 1 deletion cumulus_lambda_functions/lib/aws/aws_message_transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,24 @@ class AwsMessageTransformers:
},
'required': ['Records']
}
SNS_EVENT_SCHEMA = {
'type': 'object',
'properties': {
'Records': {
'type': 'array',
'minItems': 1,
'maxItems': 1,
'items': {
'type': 'object',
'properties': {
'Sns': {'type': 'object'}
},
'required': ['Sns']
}
}
},
'required': ['Records']
}

SNS_SCHEMA = {
"type": "object",
Expand Down Expand Up @@ -86,10 +104,22 @@ def sqs_sns(self, raw_msg: json):
sns_msg_body = json.loads(sns_msg_body)
return sns_msg_body

def get_message_from_sns_event(self, raw_msg: json):
result = JsonValidator(self.SNS_EVENT_SCHEMA).validate(raw_msg)
if result is not None:
raise ValueError(f'input json has SNS_EVENT_SCHEMA validation errors: {result}')
sns_msg = raw_msg['Records'][0]['Sns']
result = JsonValidator(self.SNS_SCHEMA).validate(sns_msg)
if result is not None:
raise ValueError(f'input json has SNS validation errors: {result}')
sns_msg_body = sns_msg['Message']
sns_msg_body = json.loads(sns_msg_body)
return sns_msg_body

def get_s3_from_sns(self, sns_msg_body):
result = JsonValidator(self.S3_RECORD_SCHEMA).validate(sns_msg_body)
if result is not None:
raise ValueError(f'sqs_msg did not pass SQS_MSG_SCHEMA: {result}')
raise ValueError(f'sns_msg_body did not pass S3_RECORD_SCHEMA: {result}')
s3_summary = {
'eventName': sns_msg_body['Records'][0]['eventName'],
'bucket': sns_msg_body['Records'][0]['s3']['bucket']['name'],
Expand Down
15 changes: 15 additions & 0 deletions cumulus_lambda_functions/lib/aws/aws_sns.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,18 @@ def publish_message(self, msg_str: str):
# MessageGroupId='string'
)
return response

def create_sqs_subscription(self, sqs_arn):
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/client/subscribe.html
if self.__topic_arn == '':
raise ValueError('missing topic arn to publish message')
response = self.__sns_client.subscribe(
TopicArn=self.__topic_arn,
Protocol='sqs',
Endpoint=sqs_arn, # For the sqs protocol, the endpoint is the ARN of an Amazon SQS queue.
# Attributes={
# 'string': 'string'
# },
ReturnSubscriptionArn=True # if the API request parameter ReturnSubscriptionArn is true, then the value is always the subscription ARN, even if the subscription requires confirmation.
)
return response['SubscriptionArn']
3 changes: 3 additions & 0 deletions cumulus_lambda_functions/lib/aws/es_abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

class ESAbstract(ABC):
@abstractmethod
def migrate_index_data(self, old_index, new_index):
return
@abstractmethod
def create_index(self, index_name, index_body):
return

Expand Down
28 changes: 25 additions & 3 deletions cumulus_lambda_functions/lib/aws/es_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ def __init__(self, index, base_url, port=443) -> None:
raise ValueError(f'index or base_url is None')
self.__index = index
base_url = base_url.replace('https://', '') # hide https
self._engine = Elasticsearch(hosts=[{'host': base_url, 'port': port}])
# https://elasticsearch-py.readthedocs.io/en/v7.13.4/api.html#elasticsearch.Elasticsearch
self._engine = Elasticsearch(hosts=[{'host': base_url, 'port': port, 'use_ssl': True}])

def __validate_index(self, index):
if index is not None:
Expand Down Expand Up @@ -45,6 +46,25 @@ def __check_errors_for_bulk(self, index_result):
LOGGER.exception('failed to add some items. details: {}'.format(err_list))
return err_list

def migrate_index_data(self, old_index, new_index, remove_old_data=True):
if not self.has_index(old_index) or not self.has_index(new_index):
raise ValueError(f'at least one of the indices do not exist: [{old_index}, {new_index}]')
result = self._engine.reindex(
body={
"source": {
"index": old_index,
# "query": {
# "match_all": {}
# }
},
"dest": {
"index": new_index
}
}
)
self.delete_by_query({'query': {'match_all': {}}}, old_index)
return result

def create_index(self, index_name, index_body):
result = self._engine.indices.create(index=index_name, body=index_body, include_type_name=False)
if 'acknowledged' not in result:
Expand All @@ -66,6 +86,7 @@ def swap_index_for_alias(self, alias_name, old_index_name, new_index_name):
try:
temp_result = self._engine.indices.delete_alias(index=old_index_name, name=alias_name)
except NotFoundError as ee:
LOGGER.exception(f'error while unlinking {old_index_name} from {alias_name}')
temp_result = {}
result = self.create_alias(new_index_name, alias_name)
return result
Expand All @@ -91,6 +112,7 @@ def delete_index(self, index_name):
return result['acknowledged']

def index_many(self, docs=None, doc_ids=None, doc_dict=None, index=None):
# https://elasticsearch-py.readthedocs.io/en/v7.13.4/api.html#elasticsearch.Elasticsearch.bulk
doc_dict = self.__get_doc_dict(docs, doc_ids, doc_dict)
body = []
for k, v in doc_dict.items():
Expand All @@ -115,9 +137,9 @@ def index_one(self, doc, doc_id, index=None):
body=doc, doc_type=DEFAULT_TYPE, id=doc_id)
LOGGER.info('indexed. result: {}'.format(index_result))
pass
except:
except Exception as e:
LOGGER.exception('cannot add a new index with id: {} for index: {}'.format(doc_id, index))
return None
raise e
return self

def update_many(self, docs=None, doc_ids=None, doc_dict=None, index=None):
Expand Down
94 changes: 94 additions & 0 deletions cumulus_lambda_functions/lib/uds_db/archive_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from cumulus_lambda_functions.lib.json_validator import JsonValidator
from cumulus_lambda_functions.lib.uds_db.db_constants import DBConstants

from cumulus_lambda_functions.lib.aws.es_abstract import ESAbstract

from cumulus_lambda_functions.lib.aws.es_factory import ESFactory


class UdsArchiveConfigIndex:
basic_schema = {
'type': 'object',
'required': ['daac_collection_id', 'daac_sns_topic_arn', 'daac_data_version', 'collection', 'ss_username', 'archiving_types'],
'properties': {
'daac_collection_id': {'type': 'string'},
'daac_sns_topic_arn': {'type': 'string'},
'daac_data_version': {'type': 'string'},
'collection': {'type': 'string'},
'ss_username': {'type': 'string'},
'archiving_types': {'type': 'array', 'items': {'type': 'object'}},
}
}
def __init__(self, es_url, es_port=443):
self.__es: ESAbstract = ESFactory().get_instance('AWS',
index='TODO',
base_url=es_url,
port=es_port)
self.__tenant, self.__venue = '', ''

def set_tenant_venue(self, tenant, venue):
self.__tenant, self.__venue = tenant, venue
return self

def get_config(self, collection_id, username=None):
read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{self.__tenant}_{self.__venue}_perc'.lower().strip()
conditions = [{"term": {"collection": {"value": collection_id}}}]
if username is not None:
conditions.append({"term": {"ss_username": {"value": username}}})
result = self.__es.query({
'size': 9999,
'query': {
'bool': {
'must': conditions
}
}
}, read_alias_name)
return [k['_source'] for k in result['hits']['hits']]

def add_new_config(self, ingesting_dict: dict):
result = JsonValidator(self.basic_schema).validate(ingesting_dict)
if result is not None:
raise ValueError(f'input ingesting_dict has basic_schema validation errors: {result}')
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{self.__tenant}_{self.__venue}_perc'.lower().strip()
ingesting_dict['daac_collection_name'] = ingesting_dict.pop('daac_collection_id')
ingesting_dict['ss_query'] = {
"bool": {
"must": [{
"term": {"collection": {"value": ingesting_dict['collection']} }
}]
}
}
result = self.__es.index_one(ingesting_dict, f"{ingesting_dict['daac_collection_name']}__{ingesting_dict['collection']}", index=write_alias_name)
return

def delete_config(self, collection_id, daac_collection_id):
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{self.__tenant}_{self.__venue}_perc'.lower().strip()
self.__es.delete_by_query({
'size': 9999,
'query': {
'bool': {
'must': [
{ "term": {"collection": {"value": collection_id}} },
{"term": {"daac_collection_name": {"value": daac_collection_id}}},
]
}
}
}, write_alias_name)
return

def update_config(self, collection_id, daac_collection_id, daac_sns_topic_arn, username):
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{self.__tenant}_{self.__venue}_perc'.lower().strip()
result = self.__es.update_one({
"collection": collection_id,
"daac_collection_name": daac_collection_id,
"daac_sns_topic_arn": daac_sns_topic_arn,
"ss_query": {
"bool": {
"must": [{
"term": {"collection": {"value": collection_id}}
}]
}
},
"ss_username": username,
}, f'{daac_collection_id}__{collection_id}', index=write_alias_name)
return
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(self):
# "event_time": {"type": "long"}
# }
self.__default_fields = GranulesIndexMapping.stac_mappings
self.__ss_fields = GranulesIndexMapping.percolator_mappings

@staticmethod
def to_es_bbox(bbox_array):
Expand Down Expand Up @@ -68,8 +69,13 @@ def default_fields(self, val):
self.__default_fields = val
return

def __add_custom_mappings(self, es_mapping: dict):
def __add_custom_mappings(self, es_mapping: dict, include_perc=False):
potential_ss_fields = {} if not include_perc else self.__ss_fields
customized_es_mapping = deepcopy(self.default_fields)
customized_es_mapping = {
**potential_ss_fields,
**self.default_fields,
}
customized_es_mapping['properties']['properties'] = {
**es_mapping,
**self.default_fields['properties']['properties'],
Expand Down Expand Up @@ -102,6 +108,7 @@ def create_new_index(self, tenant, tenant_venue, es_mapping: dict):
tenant = tenant.replace(':', '--')
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()

current_alias = self.__es.get_alias(write_alias_name)
# {'meta_labels_v2': {'aliases': {'metadata_labels': {}}}}
current_index_name = f'{write_alias_name}__v0' if current_alias == {} else [k for k in current_alias.keys()][0]
Expand All @@ -122,6 +129,28 @@ def create_new_index(self, tenant, tenant_venue, es_mapping: dict):
self.__es.create_index(new_index_name, index_mapping)
self.__es.create_alias(new_index_name, read_alias_name)
self.__es.swap_index_for_alias(write_alias_name, current_index_name, new_index_name)

write_perc_alias_name = f'{DBConstants.granules_write_alias_prefix}_{tenant}_{tenant_venue}_perc'.lower().strip()
read_perc_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}_perc'.lower().strip()
current_perc_alias = self.__es.get_alias(write_perc_alias_name)
current_perc_index_name = f'{write_alias_name}_perc__v0' if current_perc_alias == {} else [k for k in current_perc_alias.keys()][0]
new_perc_index_name = f'{DBConstants.granules_index_prefix}_{tenant}_{tenant_venue}_perc__v{new_version:02d}'.lower().strip()
customized_perc_es_mapping = self.__add_custom_mappings(es_mapping, True)
LOGGER.debug(f'customized_perc_es_mapping: {customized_perc_es_mapping}')
perc_index_mapping = {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 2
},
"mappings": {
"dynamic": "strict",
"properties": customized_perc_es_mapping,
}
}
self.__es.create_index(new_perc_index_name, perc_index_mapping)
self.__es.create_alias(new_perc_index_name, read_perc_alias_name)
self.__es.swap_index_for_alias(write_perc_alias_name, current_perc_index_name, new_perc_index_name)
self.__es.migrate_index_data(current_perc_index_name, new_perc_index_name)
return

def get_latest_index(self, tenant, tenant_venue):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from cumulus_lambda_functions.lib.metadata_extraction.echo_metadata import EchoMetadata
from cumulus_lambda_functions.lib.time_utils import TimeUtils
from cumulus_lambda_functions.metadata_stac_generate_cmr.stac_input_metadata import StacInputMetadata
from cumulus_lambda_functions.uds_api.dapa.granules_db_index import GranulesDbIndex
from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex

LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())

Expand Down
Empty file.
Loading
Loading