Skip to content

Commit

Permalink
feat: Adding automated ingestion lambda (#368)
Browse files Browse the repository at this point in the history
* feat: some steps for cnm ingester

* feat: add default collection code

* feat: create collection is done

* feat: methods for automated ingester is done

* feat: add lambda entry pointj

* fix: merge conflicts

* feat: update lambda config

* fix: unquoting s3 url

* fix: not triggering final step debugging

* fix: maybe provider is missing

* fix: add big upload test

* chore: dummy commit

* fix: get docker build working again

* fix: get github action working again
  • Loading branch information
wphyojpl authored May 13, 2024
1 parent 33310ae commit 8ecadd7
Show file tree
Hide file tree
Showing 19 changed files with 918 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/makefile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
with:
python-version: '3.9'
- run: |
python3 "${GITHUB_WORKSPACE}/setup.py" install
python3 -m pip install -r "${GITHUB_WORKSPACE}/requirements.txt"
- run: |
python3 "${GITHUB_WORKSPACE}/setup.py" install_lib
- run: |
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

</div>

<pre align="center">This repository contains source code that handles data ingest, data catalog, data search and data access that complies to OGC DAPA and STAC specifications.</pre>
<pre align="center">This repository contains source code that handles data ingest, data catalog, data search and data access that complies to OGC DAPA and STAC specifications</pre>
<!-- ☝️ Replace with a single sentence describing the purpose of your repo / proj ☝️ -->

<!-- Header block for project -->
Expand Down
5 changes: 4 additions & 1 deletion ci.cd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ upload_lambda:
aws --profile saml-pub s3 cp cumulus_lambda_functions_deployment.zip s3://am-uds-dev-cumulus-tf-state/unity_cumulus_lambda/

upload_lambda_mcp_dev:
aws s3 cp cumulus_lambda_functions_deployment.zip s3://uds-dev-cumulus-public/unity_cumulus_lambda/
aws s3 cp tf-module/unity-cumulus/build/cumulus_lambda_functions_deployment.zip s3://uds-dev-cumulus-public/unity_cumulus_lambda/
update_lambda_function_mcp_dev_6:
aws lambda update-function-code --s3-key unity_cumulus_lambda/cumulus_lambda_functions_deployment.zip --s3-bucket uds-dev-cumulus-public --function-name arn:aws:lambda:us-west-2:237868187491:function:uds-dev-cumulus-metadata_s4pa_generate_cmr --publish &>/dev/null
update_lambda_function_mcp_dev_7:
Expand Down Expand Up @@ -49,6 +49,9 @@ update_lambda_function_mcp_sbx_7:
update_lambda_function_mcp_sbx_8:
aws lambda update-function-code --s3-key unity_cumulus_lambda/cumulus_lambda_functions_deployment.zip --s3-bucket uds-dev-cumulus-public --function-name arn:aws:lambda:us-west-2:237868187491:function:uds-sbx-cumulus-granules_to_es --publish &>/dev/null

update_lambda_function_mcp_sbx_ingester:
aws lambda update-function-code --s3-key unity_cumulus_lambda/cumulus_lambda_functions_deployment.zip --s3-bucket uds-dev-cumulus-public --function-name arn:aws:lambda:us-west-2:237868187491:function:uds-sbx-cumulus-granules_cnm_ingester --publish &>/dev/null

mcp_sbx: upload_lambda_mcp_dev update_lambda_function_mcp_sbx_7 update_lambda_function_mcp_sbx_8

mcp_sbx_fastapi: upload_lambda_mcp_dev update_lambda_function_mcp_sbx_uds_api
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import os
import time

from cumulus_lambda_functions.lib.aws.aws_message_transformers import AwsMessageTransformers
from cumulus_lambda_functions.lib.uds_db.uds_collections import UdsCollections

from cumulus_lambda_functions.stage_in_out.stage_in_out_utils import StageInOutUtils

from cumulus_lambda_functions.uds_api.dapa.collections_dapa_cnm import CollectionsDapaCnm

from cumulus_lambda_functions.cumulus_stac.unity_collection_stac import UnityCollectionStac
from cumulus_lambda_functions.uds_api.dapa.collections_dapa_creation import CollectionDapaCreation
from cumulus_lambda_functions.cumulus_stac.item_transformer import ItemTransformer
from pystac import ItemCollection, Item
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator
from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3

LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())

"""
TODO
UNITY_DEFAULT_PROVIDER
CUMULUS_WORKFLOW_NAME
REPORT_TO_EMS
CUMULUS_WORKFLOW_SQS_URL
CUMULUS_LAMBDA_PREFIX
ES_URL
ES_PORT
SNS_TOPIC_ARN
"""
class GranulesCnmIngesterLogic:
def __init__(self):
self.__s3 = AwsS3()
self.__successful_features_json = None
self.__successful_features: ItemCollection = None
self.__collection_id = None
self.__chunk_size = StageInOutUtils.CATALOG_DEFAULT_CHUNK_SIZE
if 'UNITY_DEFAULT_PROVIDER' not in os.environ:
raise ValueError(f'missing UNITY_DEFAULT_PROVIDER')
self.__default_provider = os.environ.get('UNITY_DEFAULT_PROVIDER')
self.__uds_collection = UdsCollections(es_url=os.getenv('ES_URL'), es_port=int(os.getenv('ES_PORT', '443')))

@property
def successful_features_json(self):
return self.__successful_features_json

@successful_features_json.setter
def successful_features_json(self, val):
"""
:param val:
:return: None
"""
self.__successful_features_json = val
return

@property
def collection_id(self):
return self.__collection_id

@collection_id.setter
def collection_id(self, val):
"""
:param val:
:return: None
"""
self.__collection_id = val
return

@property
def successful_features(self):
return self.__successful_features

@successful_features.setter
def successful_features(self, val):
"""
:param val:
:return: None
"""
self.__successful_features = val
return

def load_successful_features_s3(self, successful_features_s3_url):
self.__s3.set_s3_url(successful_features_s3_url)
if not self.__s3.exists(self.__s3.target_bucket, self.__s3.target_key):
LOGGER.error(f'missing successful_features: {successful_features_s3_url}')
raise ValueError(f'missing successful_features: {successful_features_s3_url}')
local_successful_features = self.__s3.download('/tmp')
self.__successful_features_json = FileUtils.read_json(local_successful_features)
FileUtils.remove_if_exists(local_successful_features)
self.__successful_features = ItemCollection.from_dict(self.__successful_features_json)
return

def validate_granules(self):
if self.successful_features is None:
raise RuntimeError(f'NULL successful_features')
missing_granules = []
for each_granule in self.successful_features.items:
missing_assets = []
for each_asset_name, each_asset in each_granule.assets.items():
temp_bucket, temp_key = self.__s3.split_s3_url(each_asset.href)
if not self.__s3.exists(temp_bucket, temp_key):
missing_assets.append({each_asset_name: each_asset.href})
if len(missing_assets) > 0:
missing_granules.append({
'granule_id': each_granule.id,
'missing_assets': missing_assets
})
if len(missing_granules) > 0:
LOGGER.error(f'missing_granules: {missing_granules}')
raise ValueError(f'missing_granules: {missing_granules}')
return

def extract_collection_id(self):
if self.successful_features is None:
raise RuntimeError(f'NULL successful_features')
if len(self.successful_features.items) < 1:
LOGGER.error(f'not required to process. No Granules: {self.successful_features.to_dict(False)}')
return
self.collection_id = self.successful_features.items[0].collection_id
return

def has_collection(self):
uds_collection_result = self.__uds_collection.get_collection(self.collection_id)
return len(uds_collection_result) > 0

def create_collection(self):
if self.collection_id is None:
raise RuntimeError(f'NULL collection_id')
if self.has_collection():
LOGGER.debug(f'{self.collection_id} already exists. continuing..')
return
# ref: https://github.com/unity-sds/unity-py/blob/0.4.0/unity_sds_client/services/data_service.py
dapa_collection = UnityCollectionStac() \
.with_id(self.collection_id) \
.with_graule_id_regex("^test_file.*$") \
.with_granule_id_extraction_regex("(^test_file.*)(\\.nc|\\.nc\\.cas|\\.cmr\\.xml)") \
.with_title(f'Collection: {self.collection_id}') \
.with_process('stac') \
.with_provider(self.__default_provider) \
.add_file_type("test_file01.nc", "^test_file.*\\.nc$", 'unknown_bucket', 'application/json', 'root') \
.add_file_type("test_file01.nc", "^test_file.*\\.nc$", 'protected', 'data', 'item') \
.add_file_type("test_file01.nc.cas", "^test_file.*\\.nc.cas$", 'protected', 'metadata', 'item') \
.add_file_type("test_file01.nc.cmr.xml", "^test_file.*\\.nc.cmr.xml$", 'protected', 'metadata', 'item') \
.add_file_type("test_file01.nc.stac.json", "^test_file.*\\.nc.stac.json$", 'protected', 'metadata', 'item')

stac_collection = dapa_collection.start()
creation_result = CollectionDapaCreation(stac_collection).create()
if creation_result['statusCode'] >= 400:
raise RuntimeError(f'failed to create collection: {self.collection_id}. details: {creation_result["body"]}')
time.sleep(3) # cool off period before checking DB
if not self.has_collection():
LOGGER.error(f'missing collection. (failed to create): {self.collection_id}')
raise ValueError(f'missing collection. (failed to create): {self.collection_id}')
return

def send_cnm_msg(self):
LOGGER.debug(f'starting ingest_cnm_dapa_actual')
try:
errors = []
for i, features_chunk in enumerate(StageInOutUtils.chunk_list(self.successful_features_json['features'], self.__chunk_size)):
try:
LOGGER.debug(f'working on chunk_index {i}')
dapa_body = {
"provider_id": self.__default_provider,
"features": features_chunk
}
collections_dapa_cnm = CollectionsDapaCnm(dapa_body)
cnm_result = collections_dapa_cnm.start()
if cnm_result['statusCode'] != 200:
errors.extend(features_chunk)
except Exception as e1:
LOGGER.exception(f'failed to queue CNM process.')
errors.extend(features_chunk)
except Exception as e:
LOGGER.exception('failed to ingest to CNM')
raise ValueError(f'failed to ingest to CNM: {e}')
if len(errors) > 0:
raise RuntimeError(f'failures during CNM ingestion: {errors}')
return

def start(self, event):
LOGGER.debug(f'event: {event}')
sns_msg = AwsMessageTransformers().sqs_sns(event)
s3_details = AwsMessageTransformers().get_s3_from_sns(sns_msg)
s3_url = f's3://{s3_details["bucket"]}/{s3_details["key"]}'
self.load_successful_features_s3(s3_url)
self.validate_granules()
self.extract_collection_id()
self.create_collection()
self.send_cnm_msg()
return
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json

from cumulus_lambda_functions.granules_cnm_ingester.granules_cnm_ingester_logic import GranulesCnmIngesterLogic
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator


Expand All @@ -7,7 +9,8 @@ def lambda_handler(event, context):
:param event:
:param context:
:return:
{'Records': [{'messageId': '6ff7c6fd-4053-4ab4-bc12-c042be1ed275', 'receiptHandle': 'AQEBYASiFPjQT5JBI2KKCTF/uQhHfJt/tHhgucslQQdvkNVxcXCNi2E5Ux4U9N0eu7RfvlnvtycjUh0gdL7jIeoyH+VRKSF61uAJuT4p31BsNe0GYu49N9A6+kxjP/RrykR7ZofmQRdHToX1ugRc76SMRic4H/ZZ89YAHA2QeomJFMrYywIxlk8OAzYaBf2dQI7WexjY5u1CW00XNMbTGyTo4foVPxcSn6bdFpfgxW/L7yJMX/0YQvrA9ruiuQ+lrui+6fWYh5zEk3f5v1bYtUQ6DtyyfbtMHZQJTJpUlWAFRzzN+3melilH7FySyOGDXhPb0BOSzmdKq9wBbfLW/YPb7l99ejq4GfRfj8LyI4EtB96vTeUw4LCgUqbZcBrxbGBLUXMacweh+gCjHav9ylqr2SeOiqG3vWPq9pwFYQIDqNE=', 'body': '{\n "Type" : "Notification",\n "MessageId" : "33e1075a-435c-5217-a33d-59fae85e19b2",\n "TopicArn" : "arn:aws:sns:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester",\n "Subject" : "Amazon S3 Notification",\n "Message" : "{\\"Service\\":\\"Amazon S3\\",\\"Event\\":\\"s3:TestEvent\\",\\"Time\\":\\"2024-04-22T18:13:22.416Z\\",\\"Bucket\\":\\"uds-sbx-cumulus-staging\\",\\"RequestId\\":\\"DQ4T0GRVFPSX45C9\\",\\"HostId\\":\\"gHBFnYNmfnGDZBmqoQwA3RScjtjBk5lr426moGxu8IDpe5UhWAqNTxHqilWBoPN1njzIrzNrf8c=\\"}",\n "Timestamp" : "2024-04-22T18:13:22.434Z",\n "SignatureVersion" : "1",\n "Signature" : "RvSxqpU7J7CCJXbin9cXqTxzjMjgAUFtk/n454mTMcOe5x3Ay1w4AHfzyeYQCFBdLHNBa8n3OdMDoDlJqyVQMb8k+nERaiZWN2oqFVDRqT9pqSr89b+4FwlhPv6TYy2pBa/bgjZ4cOSYsey1uSQ3hjl0idfssvuV5cCRxQScbA+yu8Gcv9K7Oqgy01mC0sDHiuPIifhFXxupG5ygbjqoHIB+1gdMEbBwyixoY5GOpHM/O2uHNF+dJDjax1WMxQ2FzVjiFeCa+tNcjovF059+tx2v1YmDq/kEAFrN6DAtP6R4zKag62P9jkvjU/wHYJ2jjXmZAqoG+nuzAo24HiZPSw==",\n "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-60eadc530605d63b8e62a523676ef735.pem",\n "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester:76cbefa1-addf-45c2-97e1-ae16986b195b"\n}', 'attributes': {'ApproximateReceiveCount': '1', 'SentTimestamp': '1713809602474', 'SenderId': 'AIDAIYLAVTDLUXBIEIX46', 'ApproximateFirstReceiveTimestamp': '1713809602483'}, 'messageAttributes': {}, 'md5OfBody': 'c6d06d1b742ad5bd2cfe5f542640aad2', 'eventSource': 'aws:sqs', 'eventSourceARN': 'arn:aws:sqs:us-west-2:237868187491:uds-sbx-cumulus-granules_cnm_ingester', 'awsRegion': 'us-west-2'}]}
"""
LambdaLoggerGenerator.remove_default_handlers()
print(f'event: {event}')
raise NotImplementedError('Require implementation later')
GranulesCnmIngesterLogic().start(event)
return
56 changes: 39 additions & 17 deletions cumulus_lambda_functions/lib/aws/aws_message_transformers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from urllib.parse import unquote

from cumulus_lambda_functions.lib.json_validator import JsonValidator

Expand Down Expand Up @@ -29,8 +30,15 @@ class AwsMessageTransformers:
"Type": {"type": "string"},
"MessageId": {"type": "string"},
"TopicArn": {"type": "string"},
"Subject": {"type": "string"},
"Timestamp": {"type": "string"},
"SignatureVersion": {"type": "string"},
"Signature": {"type": "string"},
"SigningCertURL": {"type": "string"},
"UnsubscribeURL": {"type": "string"},
"Message": {"type": "string"},
}
},
"required": ["Message"]
}

S3_RECORD_SCHEMA = {
Expand All @@ -41,22 +49,25 @@ class AwsMessageTransformers:
'maxItems': 1,
'items': {
'type': 'object',
'properties': {'s3': {
'type': 'object',
'properties': {
'bucket': {
'type': 'object',
'properties': {'name': {'type': 'string', 'minLength': 1}},
'required': ['name']
},
'object': {
'type': 'object',
'properties': {'key': {'type': 'string', 'minLength': 1}},
'required': ['key']
}},
'required': ['bucket', 'object']
}},
'required': ['s3']
'properties': {
'eventName': {'type': 'string'},
's3': {
'type': 'object',
'properties': {
'bucket': {
'type': 'object',
'properties': {'name': {'type': 'string', 'minLength': 1}},
'required': ['name']
},
'object': {
'type': 'object',
'properties': {'key': {'type': 'string', 'minLength': 1}},
'required': ['key']
}},
'required': ['bucket', 'object']
}
},
'required': ['eventName', 's3']
}
}},
'required': ['Records']
Expand All @@ -74,3 +85,14 @@ def sqs_sns(self, raw_msg: json):
sns_msg_body = sqs_msg_body['Message']
sns_msg_body = json.loads(sns_msg_body)
return sns_msg_body

def get_s3_from_sns(self, sns_msg_body):
result = JsonValidator(self.S3_RECORD_SCHEMA).validate(sns_msg_body)
if result is not None:
raise ValueError(f'sqs_msg did not pass SQS_MSG_SCHEMA: {result}')
s3_summary = {
'eventName': sns_msg_body['Records'][0]['eventName'],
'bucket': sns_msg_body['Records'][0]['s3']['bucket']['name'],
'key': unquote(sns_msg_body['Records'][0]['s3']['object']['key'].replace('+', ' ')),
}
return s3_summary
7 changes: 7 additions & 0 deletions cumulus_lambda_functions/lib/aws/aws_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ def __upload_to_s3(self, bucket, prefix, file_path, delete_files=False, add_size
raise e
return f's3://{bucket}/{s3_key}'

def exists(self, base_path: str, relative_path: str):
try:
response = self.__s3_client.head_object(Bucket=base_path, Key=relative_path)
except:
return False
return True

def upload(self, file_path: str, base_path: str, relative_parent_path: str, delete_files: bool,
s3_name: Union[str, None] = None, obj_tags: dict = {}, overwrite: bool = False):
s3_url = self.__upload_to_s3(base_path, relative_parent_path, file_path, delete_files, True, obj_tags, s3_name)
Expand Down
19 changes: 19 additions & 0 deletions cumulus_lambda_functions/lib/uds_db/uds_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,25 @@ def add_collection(self, collection_id: str, start_time: int, end_time: int, bbo
self.__es.index_one(indexing_dict, collection_id, DBConstants.collections_index)
return self

def get_collection(self, collection_id: str):
authorized_collection_ids_dsl = {
'size': 20,
'query': {
'bool': {
'must': [
{'term': {DBConstants.collection_id: {'value': collection_id}}}
]
}
},
'sort': [
{DBConstants.collection_id: {'order': 'asc'}}
]
}
LOGGER.debug(f'authorized_collection_ids_dsl: {authorized_collection_ids_dsl}')
authorized_collection_ids = self.__es.query(authorized_collection_ids_dsl, DBConstants.collections_index)
authorized_collection_ids = [k['_source'] for k in authorized_collection_ids['hits']['hits']]
return authorized_collection_ids

def get_collections(self, collection_regex: list):
# temp_dsl = {
# 'query': {'match_all': {}},
Expand Down
Loading

0 comments on commit 8ecadd7

Please sign in to comment.