Skip to content

Commit

Permalink
feat: add CNM Response Archival lambda (#369)
Browse files Browse the repository at this point in the history
* feat: add additional lambda

* fix: urllib has some library clash in github actions

* fix: still not working

* fix: still not working

* fix: still not working

* fix: fixing github action

* fix: undo setup.py

* fix: forgot to call s3 url extraction

* feat: add test case

* feat: add diagrams

* fix: remove extra arrow
  • Loading branch information
wphyojpl authored May 13, 2024
1 parent ffa9736 commit 7143641
Show file tree
Hide file tree
Showing 13 changed files with 265 additions and 1 deletion.
4 changes: 4 additions & 0 deletions ci.cd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ update_lambda_function_mcp_sbx_7:
update_lambda_function_mcp_sbx_8:
aws lambda update-function-code --s3-key unity_cumulus_lambda/cumulus_lambda_functions_deployment.zip --s3-bucket uds-dev-cumulus-public --function-name arn:aws:lambda:us-west-2:237868187491:function:uds-sbx-cumulus-granules_to_es --publish &>/dev/null


update_lambda_function_uds-sbx-cumulus-granules_cnm_response_writer:
aws lambda update-function-code --s3-key unity_cumulus_lambda/cumulus_lambda_functions_deployment.zip --s3-bucket uds-dev-cumulus-public --function-name arn:aws:lambda:us-west-2:237868187491:function:uds-sbx-cumulus-granules_cnm_response_writer --publish &>/dev/null

update_lambda_function_mcp_sbx_ingester:
aws lambda update-function-code --s3-key unity_cumulus_lambda/cumulus_lambda_functions_deployment.zip --s3-bucket uds-dev-cumulus-public --function-name arn:aws:lambda:us-west-2:237868187491:function:uds-sbx-cumulus-granules_cnm_ingester --publish &>/dev/null

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import json

from cumulus_lambda_functions.lib.json_validator import JsonValidator
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator
from cumulus_lambda_functions.lib.aws.aws_message_transformers import AwsMessageTransformers
from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3

LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())


class CnmResultWriter:
def __init__(self):
self.__s3 = AwsS3()
self.__cnm_response_schema = {
'type': 'object',
'required': ['collection', 'product', 'submissionTime'],
'properties': {
'submissionTime': {'type': 'string'},
'collection': {'type': 'string'},
'product': {
'type': 'object',
'required': ['name', 'files'],
'properties': {
'name': {'type': 'string'},
'files': {
'type': 'array',
'minItems': 1,
'items': {
'type': 'object',
'required': ['name', 'uri'],
'properties': {
'name': {'type': 'string'},
'uri': {'type': 'string'},
}
},
}
}
}
}
}
self.__cnm_response = {}
self.__s3_url = None

@property
def s3_url(self):
return self.__s3_url

@s3_url.setter
def s3_url(self, val):
"""
:param val:
:return: None
"""
self.__s3_url = val
return

@property
def cnm_response(self):
return self.__cnm_response

@cnm_response.setter
def cnm_response(self, val):
"""
:param val:
:return: None
"""
self.__cnm_response = val
return

def extract_s3_location(self):
result = JsonValidator(self.__cnm_response_schema).validate(self.cnm_response)
if result is not None:
LOGGER.error(f'invalid JSON: {result}. request_body: {self.cnm_response}')
raise ValueError(f'invalid JSON: {result}. request_body: {self.cnm_response}')
response_filename = f'{self.cnm_response["product"]["name"]}.{self.cnm_response["submissionTime"]}.cnm.json'
parsed_url = self.cnm_response['product']['files'][0]['uri'].split('//')[1]
s3_url = parsed_url.split('/')
s3_url[-1] = response_filename
self.__s3_url = 's3://' + '/'.join(s3_url[1:])
LOGGER.debug(f'extracted s3_url: {self.__s3_url}')
return self

def start(self, event):
LOGGER.debug(f'event: {event}')
sns_msg = AwsMessageTransformers().sqs_sns(event)
LOGGER.debug(f'sns_msg: {sns_msg}')
self.cnm_response = sns_msg
self.extract_s3_location()
self.__s3.set_s3_url(self.s3_url).upload_bytes(json.dumps(self.cnm_response, indent=4).encode())
return
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import json

from cumulus_lambda_functions.granules_cnm_response_writer.cnm_result_writer import CnmResultWriter
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator


def lambda_handler(event, context):
"""
:param event:
:param context:
:return:
"""
LambdaLoggerGenerator.remove_default_handlers()
CnmResultWriter().start(event)
return
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from unittest import TestCase

from cumulus_lambda_functions.granules_cnm_response_writer.cnm_result_writer import CnmResultWriter


class TestCnmResultWriter(TestCase):
def test_01(self):
sample_msg = {
"collection": "URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:SNDR-SNPP_ATMS@L1B$OUTPUT",
"identifier": "URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:SNDR-SNPP_ATMS@L1B$OUTPUT___2403261440:abcd.1234.efgh.test_file05",
"submissionTime": "2024-05-01T13:35:23.796366",
"provider": "unity",
"version": "1.6.0",
"product": {
"dataVersion": "2403261440",
"files": [
{
"type": "data",
"name": "abcd.1234.efgh.test_file05.nc",
"uri": "https://uds-distribution-placeholder/uds-sbx-cumulus-staging/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:SNDR-SNPP_ATMS@L1B$OUTPUT___2403261440/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:SNDR-SNPP_ATMS@L1B$OUTPUT___2403261440:abcd.1234.efgh.test_file05/abcd.1234.efgh.test_file05.nc",
"checksumType": "md5",
"checksum": "unknown",
"size": -1
},
{
"type": "metadata",
"name": "abcd.1234.efgh.test_file05.nc.cas",
"uri": "https://uds-distribution-placeholder/uds-sbx-cumulus-staging/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:SNDR-SNPP_ATMS@L1B$OUTPUT___2403261440/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:SNDR-SNPP_ATMS@L1B$OUTPUT___2403261440:abcd.1234.efgh.test_file05/abcd.1234.efgh.test_file05.nc.cas",
"checksumType": "md5",
"checksum": "unknown",
"size": -1
},
{
"type": "metadata",
"name": "URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:SNDR-SNPP_ATMS@L1B$OUTPUT___2403261440:abcd.1234.efgh.test_file05.cmr.xml",
"uri": "https://uds-distribution-placeholder/uds-sbx-cumulus-staging/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:SNDR-SNPP_ATMS@L1B$OUTPUT___2403261440/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:SNDR-SNPP_ATMS@L1B$OUTPUT___2403261440:abcd.1234.efgh.test_file05/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:SNDR-SNPP_ATMS@L1B$OUTPUT___2403261440:abcd.1234.efgh.test_file05.cmr.xml",
"checksumType": "md5",
"checksum": "63e0f7f9b76d56189267c854b67cd91b",
"size": 1858
}
],
"name": "URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:SNDR-SNPP_ATMS@L1B$OUTPUT___2403261440:abcd.1234.efgh.test_file05"
},
"receivedTime": "2024-05-01T13:37:29.643Z",
"response": {
"status": "SUCCESS"
},
"processCompleteTime": "2024-05-01T13:38:01.676Z"
}
test = CnmResultWriter()
test.cnm_response = sample_msg
test.extract_s3_location()
self.assertEqual('s3://uds-sbx-cumulus-staging/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:SNDR-SNPP_ATMS@L1B$OUTPUT___2403261440/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:SNDR-SNPP_ATMS@L1B$OUTPUT___2403261440:abcd.1234.efgh.test_file05/URN:NASA:UNITY:UDS_LOCAL_TEST:DEV:SNDR-SNPP_ATMS@L1B$OUTPUT___2403261440:abcd.1234.efgh.test_file05.2024-05-01T13:35:23.796366.cnm.json', test.s3_url)
return
18 changes: 17 additions & 1 deletion tests/integration_tests/test_custom_metadata_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import pystac
import requests
from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3

from cumulus_lambda_functions.lib.time_utils import TimeUtils
from pystac import Link, Catalog, Asset, Item, ItemCollection

Expand Down Expand Up @@ -41,7 +43,7 @@ def setUp(self) -> None:
self.tenant = 'UDS_LOCAL_TEST' # 'uds_local_test' # 'uds_sandbox'
self.tenant_venue = 'DEV' # 'DEV1' # 'dev'
self.collection_name = 'UDS_COLLECTION' # 'uds_collection' # 'sbx_collection'
self.collection_version = '24.04.25.09.00'.replace('.', '') # '2402011200'
self.collection_version = '24.05.01.14.00'.replace('.', '') # '2402011200'

self.custom_metadata_body = {
'tag': {'type': 'keyword'},
Expand Down Expand Up @@ -494,6 +496,20 @@ def test_06_03_retrieve_granule_filter_no_result(self):
self.assertTrue(len(response_json['features']) == 0, f'empty granules. Need collections to compare')
return

def test_07_check_cnm_response(self):
os.environ['STAGING_BUCKET'] = 'uds-sbx-cumulus-staging'
temp_collection_id = f'URN:NASA:UNITY:{self.tenant}:{self.tenant_venue}:{self.collection_name}___{self.collection_version}'
s3 = AwsS3()
child_files = [k for k in s3.get_child_s3_files(os.environ['STAGING_BUCKET'], f'{temp_collection_id}/{temp_collection_id}:{self.granule_id}')]
cnm_response = [k for k in child_files if k[0].endswith('.cnm.json')]
self.assertEqual(len(cnm_response), 1)
with tempfile.TemporaryDirectory() as tmp_dir_name:
local_file_path = s3.set_s3_url(f's3://{os.environ["STAGING_BUCKET"]}/{cnm_response[0][0]}').download(tmp_dir_name)
cnm_response = FileUtils.read_json(local_file_path)
# NOTE: CNM response do not have collection version
self.assertEqual(cnm_response['collection'], f'URN:NASA:UNITY:{self.tenant}:{self.tenant_venue}:{self.collection_name}', f'wrong collection ID')
return

def test_01_pagination(self):
temp_collection_id = f'URN:NASA:UNITY:{self.tenant}:{self.tenant_venue}:{self.collection_name}___{self.collection_version}'
post_url = f'{self._url_prefix}/collections/{temp_collection_id}/items?limit=2'
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
15 changes: 15 additions & 0 deletions tf-module/unity-cumulus/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
## Terraform Guide on what are being deployed as there are various components.

### REST API
- Source: https://www.figma.com/file/dRmKfOzPC2NKACc0NL29bT/DAPA-API-Infrastructure
![DAPA API Infrastructure.png](DAPA%20API%20Infrastructure.png)
### Auto Ingestion Workflow
- Source: https://www.figma.com/file/5kI41JOXP1WPuC4veGCEgm/auto-ingestion-workflow-infrastructure
![auto-ingestion workflow infrastructure.png](auto-ingestion%20workflow%20infrastructure.png)

### CMR metadata generators
- 3 Lambdas which are used in CNM Step function definitions stored in cumulus-template-deploy terraform.
- Example: STAC [Metadata](https://github.jpl.nasa.gov/unity-uds/cumulus-template-deploy/blob/master/cumulus-tf/catalog_granule_workflow.asl.json#L216) Extraction
### Custom Metadata Ingestion
- Source: https://www.figma.com/file/mKxRrAlvKmrR5DULHD2yah/Custom-metadata-Ingestion-Infrastructure
![Custom metadata Ingestion Infrastructure.png](Custom%20metadata%20Ingestion%20Infrastructure.png)
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
59 changes: 59 additions & 0 deletions tf-module/unity-cumulus/granules_cnm_ingester.tf
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,62 @@ resource "aws_lambda_event_source_mapping" "granules_cnm_ingester_queue_lambda_t
batch_size = 1
enabled = true
}
################# << CNM Response Writer >> ########################

resource "aws_lambda_function" "granules_cnm_response_writer" {
filename = local.lambda_file_name
source_code_hash = filebase64sha256(local.lambda_file_name)
function_name = "${var.prefix}-granules_cnm_response_writer"
role = var.lambda_processing_role_arn
handler = "cumulus_lambda_functions.granules_cnm_response_writer.lambda_function.lambda_handler"
runtime = "python3.9"
timeout = 300
reserved_concurrent_executions = var.granules_cnm_response_writer__lambda_concurrency # TODO
environment {
variables = {
LOG_LEVEL = var.log_level
SNS_TOPIC_ARN = var.cnm_sns_topic_arn
}
}

vpc_config {
subnet_ids = var.cumulus_lambda_subnet_ids
security_group_ids = local.security_group_ids_set ? var.security_group_ids : [aws_security_group.unity_cumulus_lambda_sg[0].id]
}
tags = var.tags
}

data "aws_sns_topic" "granules_cnm_response_topic" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sns_topic.html
name = var.granules_cnm_response_topic
}

resource "aws_sqs_queue" "granules_cnm_response_writer" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sqs_queue
name = "${var.prefix}-granules_cnm_response_writer"
delay_seconds = 0
max_message_size = 262144
message_retention_seconds = 345600
visibility_timeout_seconds = var.granules_cnm_ingester__sqs_visibility_timeout_seconds // Used as cool off time in seconds. It will wait for 5 min if it fails
receive_wait_time_seconds = 0
policy = templatefile("${path.module}/sqs_policy.json", {
region: var.aws_region,
roleArn: var.lambda_processing_role_arn,
accountId: local.account_id,
sqsName: "${var.prefix}-granules_cnm_response_writer",
})
tags = var.tags
}

resource "aws_sns_topic_subscription" "granules_cnm_response_writer_subscription" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sns_topic_subscription
topic_arn = data.aws_sns_topic.granules_cnm_response_topic.arn
protocol = "sqs"
endpoint = aws_sqs_queue.granules_cnm_response_writer.arn
# filter_policy_scope = "MessageBody" // MessageAttributes. not using attributes
# filter_policy = templatefile("${path.module}/ideas_api_job_results_filter_policy.json", {})
}

resource "aws_lambda_event_source_mapping" "granules_cnm_response_writer_lambda_trigger" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_event_source_mapping#sqs
event_source_arn = aws_sqs_queue.granules_cnm_response_writer.arn
function_name = aws_lambda_function.granules_cnm_response_writer.arn
batch_size = 1
enabled = true
}
11 changes: 11 additions & 0 deletions tf-module/unity-cumulus/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,17 @@ variable "granules_cnm_ingester__lambda_concurrency" {
description = "How many Lambdas can be executed for CNM ingester concurrently"
}

variable "granules_cnm_response_writer__lambda_concurrency" {
type = number
default = 20
description = "How many Lambdas can be executed for CNM Response Writer concurrently"
}

variable "granules_cnm_response_topic" {
type = string
description = "Name of CNM Response SNS Topic"
}

variable "granules_cnm_ingester__bucket_notification_prefix" {
type = string
default = "stage_out"
Expand Down

0 comments on commit 7143641

Please sign in to comment.