Skip to content

fix: Sending SNS to DAAC #578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,37 @@ def get_cnm_response_json_file(self, potential_file, granule_id):
return None
if len(cnm_response_keys) > 1:
LOGGER.warning(f'more than 1 cnm response file: {cnm_response_keys}')
cnm_response_keys = cnm_response_keys[0]
# assuming the names are the same, and it has processing date in the filename, it is easier to reverse it
cnm_response_keys = sorted(cnm_response_keys)[-1] # sort and get the last one which is supposed to be the most recent one.
LOGGER.debug(f'cnm_response_keys: {cnm_response_keys}')
local_file = self.__s3.set_s3_url(f's3://{self.__s3.target_bucket}/{cnm_response_keys}').download('/tmp')
cnm_response_json = FileUtils.read_json(local_file)
FileUtils.remove_if_exists(local_file)
return cnm_response_json

@staticmethod
def revert_to_s3_url(input_url):
if input_url.startswith("s3://"):
return input_url
if input_url.startswith("http://") or input_url.startswith("https://"):
parts = input_url.split('/', 3)
if len(parts) < 4:
ValueError(f'invalid url: {input_url}')
path_parts = parts[3].split('/', 1)
if len(path_parts) != 2:
ValueError(f'invalid url: {input_url}')
bucket, key = path_parts
return f"s3://{bucket}/{key}"
raise ValueError(f'unknown schema: {input_url}')

def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
granule_files = uds_cnm_json['product']['files']
if 'archiving_types' not in daac_config or len(daac_config['archiving_types']) < 1:
return granule_files # TODO remove missing md5?
archiving_types = {k['data_type']: [] if 'file_extension' not in k else k['file_extension'] for k in daac_config['archiving_types']}
result_files = []
for each_file in granule_files:
LOGGER.debug(f'each_file: {each_file}')
"""
{
"type": "data",
Expand All @@ -71,6 +88,7 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
if each_file['type'] not in archiving_types:
continue
file_extensions = archiving_types[each_file['type']]
each_file['uri'] = self.revert_to_s3_url(each_file['uri'])
if len(file_extensions) < 1:
result_files.append(each_file) # TODO remove missing md5?
temp_filename = each_file['name'].upper().strip()
Expand All @@ -79,28 +97,36 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
return result_files

def send_to_daac_internal(self, uds_cnm_json: dict):
LOGGER.debug(f'uds_cnm_json: {uds_cnm_json}')
granule_identifier = UdsCollections.decode_identifier(uds_cnm_json['identifier']) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this.
self.__archive_index_logic.set_tenant_venue(granule_identifier.tenant, granule_identifier.venue)
daac_config = self.__archive_index_logic.percolate_document(uds_cnm_json['identifier'])
if daac_config is None or len(daac_config) < 1:
LOGGER.debug(f'uds_cnm_json is not configured for archival. uds_cnm_json: {uds_cnm_json}')
return
daac_config = daac_config[0] # TODO This is currently not supporting more than 1 daac.
result = JsonValidator(UdsArchiveConfigIndex.basic_schema).validate(daac_config)
if result is not None:
raise ValueError(f'daac_config does not have valid schema. Pls re-add the daac config: {result} for {daac_config}')
try:
self.__sns.set_topic_arn(daac_config['daac_sns_topic_arn'])
daac_cnm_message = {
"collection": daac_config['daac_collection_name'],
"collection": {
'name': daac_config['daac_collection_name'],
'version': daac_config['daac_data_version'],
},
"identifier": uds_cnm_json['identifier'],
"submissionTime": f'{TimeUtils.get_current_time()}Z',
"provider": granule_identifier.tenant,
"version": "1.6.0", # TODO this is hardcoded?
"product": {
"name": granule_identifier.id,
"dataVersion": daac_config['daac_data_version'],
# "dataVersion": daac_config['daac_data_version'],
'files': self.__extract_files(uds_cnm_json, daac_config),
}
}
self.__sns.publish_message(json.dumps(daac_cnm_message))
LOGGER.debug(f'daac_cnm_message: {daac_cnm_message}')
self.__sns.set_external_role(daac_config['daac_role_arn'], daac_config['daac_role_session_name']).publish_message(json.dumps(daac_cnm_message), True)
self.__granules_index.update_entry(granule_identifier.tenant, granule_identifier.venue, {
'archive_status': 'cnm_s_success',
'archive_error_message': '',
Expand Down
5 changes: 4 additions & 1 deletion cumulus_lambda_functions/lib/uds_db/archive_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ class UdsArchiveConfigIndex:
basic_schema = {
'type': 'object',
"additionalProperties": False,
'required': ['daac_collection_id', 'daac_sns_topic_arn', 'daac_data_version', 'collection', 'ss_username', 'archiving_types'],
'required': ['daac_collection_id', 'daac_sns_topic_arn', 'daac_data_version', 'daac_role_arn', 'daac_role_session_name',
'collection', 'ss_username', 'archiving_types'],
'properties': {
'daac_collection_id': {'type': 'string'},
'daac_sns_topic_arn': {'type': 'string'},
'daac_data_version': {'type': 'string'},
'daac_role_arn': {'type': 'string'},
'daac_role_session_name': {'type': 'string'},
'collection': {'type': 'string'},
'ss_username': {'type': 'string'},
'archiving_types': {'type': 'array', 'items': {'type': 'object'}},
Expand Down
4 changes: 4 additions & 0 deletions cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ class DaacUpdateModel(BaseModel):
daac_collection_id: str
daac_data_version: Optional[str] = None
daac_sns_topic_arn: Optional[str] = None
daac_role_arn: Optional[str] = None
daac_role_session_name: Optional[str] = None
archiving_types: Optional[list[ArchivingTypesModel]] = None


class DaacAddModel(BaseModel):
daac_collection_id: str
daac_data_version: str
daac_sns_topic_arn: str
daac_role_arn: str
daac_role_session_name: str
archiving_types: Optional[list[ArchivingTypesModel]] = []


Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jsonschema==4.23.0
jsonschema-specifications==2023.12.1
lark==0.12.0
mangum==0.18.0
mdps-ds-lib==1.1.1.dev702
mdps-ds-lib==1.1.1.dev800
pydantic==2.9.2
pydantic_core==2.23.4
pygeofilter==0.2.4
Expand All @@ -37,4 +37,4 @@ typing_extensions==4.12.2
tzlocal==5.2
urllib3==1.26.11
uvicorn==0.30.6
xmltodict==0.13.0
xmltodict==0.13.0
25 changes: 25 additions & 0 deletions tf-module/unity-cumulus/uds_lambda_processing_role.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
data "aws_iam_role" "lambda_processing" {
name = split("/", var.lambda_processing_role_arn)[1]
}

resource "aws_iam_policy" "uds_lambda_processing_policy" {
name = "${var.prefix}-uds_lambda_processing_policy"
description = "IAM policy for Lambda to access S3 bucket and publish to SNS topic in another account"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"sts:AssumeRole",
],
"Resource": "arn:aws:iam::*:role/*"
},
]
})
}

resource "aws_iam_role_policy_attachment" "uds_lambda_processing_policy_attachment" {
role = data.aws_iam_role.lambda_processing.name
policy_arn = aws_iam_policy.uds_lambda_processing_policy.arn
}
Loading