Skip to content

release/9.12.0 #584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 28, 2025
Merged
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,26 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [9.12.0] - 2025-05-24
### Changed
- [#585](https://github.com/unity-sds/unity-data-services/pull/585) feat: add ram size in lambdas

## [9.11.9] - 2025-05-23
### Fixed
- [#582](https://github.com/unity-sds/unity-data-services/pull/582) fix: use correct schema

## [9.11.8] - 2025-05-21
### Fixed
- [#580](https://github.com/unity-sds/unity-data-services/pull/580) fix: update-archival-index-mapping

## [9.11.7] - 2025-05-21
### Fixed
- [#578](https://github.com/unity-sds/unity-data-services/pull/578) fix: sending sns to daac

## [9.11.6] - 2025-05-14
### Fixed
- [#575](https://github.com/unity-sds/unity-data-services/pull/575) fix: lib version bump

## [9.11.5] - 2025-04-24
### Fixed
- [#568](https://github.com/unity-sds/unity-data-services/pull/568) fix: case insensitive
Expand Down
34 changes: 30 additions & 4 deletions cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,37 @@ def get_cnm_response_json_file(self, potential_file, granule_id):
return None
if len(cnm_response_keys) > 1:
LOGGER.warning(f'more than 1 cnm response file: {cnm_response_keys}')
cnm_response_keys = cnm_response_keys[0]
# assuming the names are the same, and it has processing date in the filename, it is easier to reverse it
cnm_response_keys = sorted(cnm_response_keys)[-1] # sort and get the last one which is supposed to be the most recent one.
LOGGER.debug(f'cnm_response_keys: {cnm_response_keys}')
local_file = self.__s3.set_s3_url(f's3://{self.__s3.target_bucket}/{cnm_response_keys}').download('/tmp')
cnm_response_json = FileUtils.read_json(local_file)
FileUtils.remove_if_exists(local_file)
return cnm_response_json

@staticmethod
def revert_to_s3_url(input_url):
if input_url.startswith("s3://"):
return input_url
if input_url.startswith("http://") or input_url.startswith("https://"):
parts = input_url.split('/', 3)
if len(parts) < 4:
ValueError(f'invalid url: {input_url}')
path_parts = parts[3].split('/', 1)
if len(path_parts) != 2:
ValueError(f'invalid url: {input_url}')
bucket, key = path_parts
return f"s3://{bucket}/{key}"
raise ValueError(f'unknown schema: {input_url}')

def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
granule_files = uds_cnm_json['product']['files']
if 'archiving_types' not in daac_config or len(daac_config['archiving_types']) < 1:
return granule_files # TODO remove missing md5?
archiving_types = {k['data_type']: [] if 'file_extension' not in k else k['file_extension'] for k in daac_config['archiving_types']}
result_files = []
for each_file in granule_files:
LOGGER.debug(f'each_file: {each_file}')
"""
{
"type": "data",
Expand All @@ -71,6 +88,7 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
if each_file['type'] not in archiving_types:
continue
file_extensions = archiving_types[each_file['type']]
each_file['uri'] = self.revert_to_s3_url(each_file['uri'])
if len(file_extensions) < 1:
result_files.append(each_file) # TODO remove missing md5?
temp_filename = each_file['name'].upper().strip()
Expand All @@ -79,28 +97,36 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
return result_files

def send_to_daac_internal(self, uds_cnm_json: dict):
LOGGER.debug(f'uds_cnm_json: {uds_cnm_json}')
granule_identifier = UdsCollections.decode_identifier(uds_cnm_json['identifier']) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this.
self.__archive_index_logic.set_tenant_venue(granule_identifier.tenant, granule_identifier.venue)
daac_config = self.__archive_index_logic.percolate_document(uds_cnm_json['identifier'])
if daac_config is None or len(daac_config) < 1:
LOGGER.debug(f'uds_cnm_json is not configured for archival. uds_cnm_json: {uds_cnm_json}')
return
daac_config = daac_config[0] # TODO This is currently not supporting more than 1 daac.
result = JsonValidator(UdsArchiveConfigIndex.db_record_schema).validate(daac_config)
if result is not None:
raise ValueError(f'daac_config does not have valid schema. Pls re-add the daac config: {result} for {daac_config}')
try:
self.__sns.set_topic_arn(daac_config['daac_sns_topic_arn'])
daac_cnm_message = {
"collection": daac_config['daac_collection_name'],
"collection": {
'name': daac_config['daac_collection_name'],
'version': daac_config['daac_data_version'],
},
"identifier": uds_cnm_json['identifier'],
"submissionTime": f'{TimeUtils.get_current_time()}Z',
"provider": granule_identifier.tenant,
"version": "1.6.0", # TODO this is hardcoded?
"product": {
"name": granule_identifier.id,
"dataVersion": daac_config['daac_data_version'],
# "dataVersion": daac_config['daac_data_version'],
'files': self.__extract_files(uds_cnm_json, daac_config),
}
}
self.__sns.publish_message(json.dumps(daac_cnm_message))
LOGGER.debug(f'daac_cnm_message: {daac_cnm_message}')
self.__sns.set_external_role(daac_config['daac_role_arn'], daac_config['daac_role_session_name']).publish_message(json.dumps(daac_cnm_message), True)
self.__granules_index.update_entry(granule_identifier.tenant, granule_identifier.venue, {
'archive_status': 'cnm_s_success',
'archive_error_message': '',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ class GranulesIndexMapping:
"daac_data_version": {
"type": "keyword"
},
"daac_role_arn": {
"type": "keyword"
},
"daac_role_session_name": {
"type": "keyword"
},
"archiving_types": {
"type": "object",
"properties": {
Expand Down
4 changes: 4 additions & 0 deletions cumulus_lambda_functions/granules_to_es/granules_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ def start(self):
return
self.__cumulus_record = incoming_msg['record']
if len(self.__cumulus_record['files']) < 1:
LOGGER.debug(f'No files in cumulus record. Not inserting to ES')
# TODO ingest updating stage?
return
if 'status' not in self.__cumulus_record or self.__cumulus_record['status'].upper() != 'COMPLETED':
LOGGER.debug(f'missing status or it is NOT COMPLETED status. Not inserting to ES')
return
stac_input_meta = None
potential_files = self.__get_potential_files()
LOGGER.debug(f'potential_files: {potential_files}')
Expand Down
22 changes: 21 additions & 1 deletion cumulus_lambda_functions/lib/uds_db/archive_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,31 @@ class UdsArchiveConfigIndex:
basic_schema = {
'type': 'object',
"additionalProperties": False,
'required': ['daac_collection_id', 'daac_sns_topic_arn', 'daac_data_version', 'collection', 'ss_username', 'archiving_types'],
'required': ['daac_collection_id', 'daac_sns_topic_arn', 'daac_data_version', 'daac_role_arn', 'daac_role_session_name',
'collection', 'ss_username', 'archiving_types'],
'properties': {
'daac_collection_id': {'type': 'string'},
'daac_sns_topic_arn': {'type': 'string'},
'daac_data_version': {'type': 'string'},
'daac_role_arn': {'type': 'string'},
'daac_role_session_name': {'type': 'string'},
'collection': {'type': 'string'},
'ss_username': {'type': 'string'},
'archiving_types': {'type': 'array', 'items': {'type': 'object'}},
}
}

db_record_schema = {
'type': 'object',
'required': ['daac_collection_name', 'daac_sns_topic_arn', 'daac_data_version', 'daac_role_arn',
'daac_role_session_name',
'collection', 'ss_username', 'archiving_types'],
'properties': {
'daac_collection_name': {'type': 'string'},
'daac_sns_topic_arn': {'type': 'string'},
'daac_data_version': {'type': 'string'},
'daac_role_arn': {'type': 'string'},
'daac_role_session_name': {'type': 'string'},
'collection': {'type': 'string'},
'ss_username': {'type': 'string'},
'archiving_types': {'type': 'array', 'items': {'type': 'object'}},
Expand Down
4 changes: 4 additions & 0 deletions cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ class DaacUpdateModel(BaseModel):
daac_collection_id: str
daac_data_version: Optional[str] = None
daac_sns_topic_arn: Optional[str] = None
daac_role_arn: Optional[str] = None
daac_role_session_name: Optional[str] = None
archiving_types: Optional[list[ArchivingTypesModel]] = None


class DaacAddModel(BaseModel):
daac_collection_id: str
daac_data_version: str
daac_sns_topic_arn: str
daac_role_arn: str
daac_role_session_name: str
archiving_types: Optional[list[ArchivingTypesModel]] = []


Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jsonschema==4.23.0
jsonschema-specifications==2023.12.1
lark==0.12.0
mangum==0.18.0
mdps-ds-lib==1.1.1.dev701
mdps-ds-lib==1.1.1.dev800
pydantic==2.9.2
pydantic_core==2.23.4
pygeofilter==0.2.4
Expand All @@ -37,4 +37,4 @@ typing_extensions==4.12.2
tzlocal==5.2
urllib3==1.26.11
uvicorn==0.30.6
xmltodict==0.13.0
xmltodict==0.13.0
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

setup(
name="cumulus_lambda_functions",
version="9.11.5",
version="9.12.0",
packages=find_packages(),
install_requires=install_requires,
package_data={
Expand Down
1 change: 1 addition & 0 deletions tf-module/unity-cumulus/daac_archiver.tf
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ resource "aws_lambda_function" "daac_archiver_response" {
handler = "cumulus_lambda_functions.daac_archiver.lambda_function.lambda_handler_response"
runtime = "python3.9"
timeout = 300
memory_size = 256
environment {
variables = {
LOG_LEVEL = var.log_level
Expand Down
2 changes: 2 additions & 0 deletions tf-module/unity-cumulus/granules_cnm_ingester.tf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ resource "aws_lambda_function" "granules_cnm_ingester" {
handler = "cumulus_lambda_functions.granules_cnm_ingester.lambda_function.lambda_handler"
runtime = "python3.9"
timeout = 300
memory_size = 512
reserved_concurrent_executions = var.granules_cnm_ingester__lambda_concurrency # TODO
environment {
variables = {
Expand Down Expand Up @@ -66,6 +67,7 @@ resource "aws_lambda_function" "granules_cnm_response_writer" {
handler = "cumulus_lambda_functions.granules_cnm_response_writer.lambda_function.lambda_handler"
runtime = "python3.9"
timeout = 300
memory_size = 256
reserved_concurrent_executions = var.granules_cnm_response_writer__lambda_concurrency # TODO
environment {
variables = {
Expand Down
6 changes: 5 additions & 1 deletion tf-module/unity-cumulus/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ resource "aws_lambda_function" "metadata_s4pa_generate_cmr" {
handler = "cumulus_lambda_functions.metadata_s4pa_generate_cmr.lambda_function.lambda_handler"
runtime = "python3.9"
timeout = 300
memory_size = 256
environment {
variables = {
LOG_LEVEL = var.log_level
Expand All @@ -65,6 +66,7 @@ resource "aws_lambda_function" "metadata_cas_generate_cmr" {
handler = "cumulus_lambda_functions.metadata_cas_generate_cmr.lambda_function.lambda_handler"
runtime = "python3.9"
timeout = 300
memory_size = 256
environment {
variables = {
LOG_LEVEL = var.log_level
Expand All @@ -86,6 +88,7 @@ resource "aws_lambda_function" "metadata_stac_generate_cmr" {
handler = "cumulus_lambda_functions.metadata_stac_generate_cmr.lambda_function.lambda_handler"
runtime = "python3.9"
timeout = 300
memory_size = 256
environment {
variables = {
LOG_LEVEL = var.log_level
Expand All @@ -112,6 +115,7 @@ resource "aws_lambda_function" "granules_to_es" {
handler = "cumulus_lambda_functions.granules_to_es.lambda_function.lambda_handler"
runtime = "python3.9"
timeout = 300
memory_size = 256
environment {
variables = {
LOG_LEVEL = var.log_level
Expand All @@ -137,7 +141,7 @@ resource "aws_lambda_function" "uds_api_1" {
handler = "cumulus_lambda_functions.uds_api.web_service.handler"
runtime = "python3.9"
timeout = 300

memory_size = 512
environment {
variables = {
CUMULUS_BASE = var.cumulus_base
Expand Down
25 changes: 25 additions & 0 deletions tf-module/unity-cumulus/uds_lambda_processing_role.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
data "aws_iam_role" "lambda_processing" {
name = split("/", var.lambda_processing_role_arn)[1]
}

resource "aws_iam_policy" "uds_lambda_processing_policy" {
name = "${var.prefix}-uds_lambda_processing_policy"
description = "IAM policy for Lambda to access S3 bucket and publish to SNS topic in another account"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"sts:AssumeRole",
],
"Resource": "arn:aws:iam::*:role/*"
},
]
})
}

resource "aws_iam_role_policy_attachment" "uds_lambda_processing_policy_attachment" {
role = data.aws_iam_role.lambda_processing.name
policy_arn = aws_iam_policy.uds_lambda_processing_policy.arn
}
Loading