Skip to content

fix: Collection deletion - Adding Cumulus Execution Deletions #597

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions cumulus_lambda_functions/cumulus_wrapper/query_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def with_collections(self, collection_ids: list):
collection_names = [k.split('___')[0] for k in collection_ids]
self._conditions.append(f'{self.__collection_name}__in={",".join(collection_names)}')
return self

def get_size(self, private_api_prefix: str):
query_params = {'field': 'status', 'type': 'collections'}
main_conditions = {k[0]: k[1] for k in [k1.split('=') for k1 in self._conditions]}
Expand Down Expand Up @@ -191,6 +192,75 @@ def delete_sqs_rules(self, new_collection: dict, private_api_prefix: str):
return {'server_error': f'error while invoking:{str(e)}'}
return {'status': query_result['message']}

def delete_executions(self, new_collection: dict, private_api_prefix: str):
# $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken'
request_body = {
"collectionId": f'{new_collection["name"]}___{new_collection["version"]}',
"esBatchSize": 10000,
"dbBatchSize": 50000
}
payload = {
'httpMethod': 'POST',
'resource': '/{proxy+}',
'path': f'/executions/bulk-delete-by-collection',
'headers': {
'Content-Type': 'application/json',
},
'body': json.dumps(request_body)
}
LOGGER.debug(f'payload: {payload}')
try:
query_result = self._invoke_api(payload, private_api_prefix)
"""
{'statusCode': 500, 'body': '', 'headers': {}}
"""
if query_result['statusCode'] >= 500:
LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}')
return {'server_error': query_result}
if query_result['statusCode'] >= 400:
LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}')
return {'client_error': query_result}
query_result = json.loads(query_result['body'])
LOGGER.debug(f'json query_result: {query_result}')
if 'id' not in query_result:
return {'server_error': f'invalid response: {query_result}'}
except Exception as e:
LOGGER.exception('error while invoking')
return {'server_error': f'error while invoking:{str(e)}'}
return {'status': query_result}

def list_executions(self, new_collection: dict, private_api_prefix: str):
# $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken'
payload = {
'httpMethod': 'GET',
'resource': '/{proxy+}',
'path': f'/executions',
'queryStringParameters': {'limit': '100', 'collectionId': f'{new_collection["name"]}___{new_collection["version"]}'},
'headers': {
'Content-Type': 'application/json',
}
}
LOGGER.debug(f'payload: {payload}')
try:
query_result = self._invoke_api(payload, private_api_prefix)
"""
{'statusCode': 500, 'body': '', 'headers': {}}
"""
if query_result['statusCode'] >= 500:
LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}')
return {'server_error': query_result}
if query_result['statusCode'] >= 400:
LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}')
return {'client_error': query_result}
query_result = json.loads(query_result['body'])
LOGGER.debug(f'json query_result: {query_result}')
if 'results' not in query_result:
return {'server_error': f'invalid response: {query_result}'}
except Exception as e:
LOGGER.exception('error while invoking')
return {'server_error': f'error while invoking:{str(e)}'}
return {'results': query_result['results']}

def create_sqs_rules(self, new_collection: dict, private_api_prefix: str, sqs_url: str, provider_name: str = '', workflow_name: str = 'CatalogGranule', visibility_timeout: int = 1800):
"""
curl --request POST "$CUMULUS_BASEURL/rules" --header "Authorization: Bearer $cumulus_token" --header 'Content-Type: application/json' --data '{
Expand Down
117 changes: 54 additions & 63 deletions cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os
from time import sleep
from typing import Optional

import pystac
Expand Down Expand Up @@ -81,68 +82,17 @@ def __init__(self, request_body):
self.__uds_collection = UdsCollections(es_url=os.getenv('ES_URL'), es_port=int(os.getenv('ES_PORT', '443')), es_type=os.getenv('ES_TYPE', 'AWS'), use_ssl=os.getenv('ES_USE_SSL', 'TRUE').strip() is True)
self.__cumulus_collection_query = CollectionsQuery('', '')

def __delete_collection_cumulus(self, cumulus_collection_doc):
delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version'])
if 'status' not in delete_result:
LOGGER.error(f'status not in creation_result: {delete_result}')
def analyze_cumulus_result(self, cumulus_request_result):
if 'status' not in cumulus_request_result:
LOGGER.error(f'status not in cumulus_request_result: {cumulus_request_result}')
return {
'statusCode': 500,
'body': {
'message': delete_result
'message': cumulus_request_result
}
}, None
return None, delete_result
return None, cumulus_request_result

def __create_collection_cumulus(self, cumulus_collection_doc):
creation_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix)
if 'status' not in creation_result:
LOGGER.error(f'status not in creation_result: {creation_result}')
return {
'statusCode': 500,
'body': {
'message': creation_result
}
}, None
return None, creation_result

def __create_rules_cumulus(self, cumulus_collection_doc):
rule_creation_result = self.__cumulus_collection_query.create_sqs_rules(
cumulus_collection_doc,
self.__cumulus_lambda_prefix,
self.__ingest_sqs_url,
self.__provider_id,
self.__workflow_name,
)
if 'status' not in rule_creation_result:
LOGGER.error(f'status not in rule_creation_result. deleting collection: {rule_creation_result}')
delete_collection_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix,
cumulus_collection_doc['name'],
cumulus_collection_doc['version'])
self.__uds_collection.delete_collection(self.__collection_transformer.get_collection_id())
return {
'statusCode': 500,
'body': {
'message': rule_creation_result,
'details': f'collection deletion result: {delete_collection_result}'
}
}
return None

def __delete_rules_cumulus(self, cumulus_collection_doc):
rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules(
cumulus_collection_doc,
self.__cumulus_lambda_prefix
)
if 'status' not in rule_deletion_result:
LOGGER.error(f'status not in rule_creation_result. deleting collection: {rule_deletion_result}')
return {
'statusCode': 500,
'body': {
'message': rule_deletion_result,
'details': f'collection deletion result: {rule_deletion_result}'
}
}
return None

def __delete_collection_uds(self):
try:
Expand Down Expand Up @@ -189,17 +139,36 @@ def __create_collection_uds(self, cumulus_collection_doc):
def delete(self):
deletion_result = {}
try:

cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body)
self.__provider_id = self.__provider_id if self.__collection_transformer.output_provider is None else self.__collection_transformer.output_provider
LOGGER.debug(f'__provider_id: {self.__provider_id}')
creation_result = 'NA'

if self.__include_cumulus:
rules_deletion_result = self.__delete_rules_cumulus(cumulus_collection_doc)
deletion_result['cumulus_rule_deletion'] = rules_deletion_result if rules_deletion_result is not None else 'succeeded'
delete_err, delete_result = self.__delete_collection_cumulus(cumulus_collection_doc)
result = self.__cumulus_collection_query.list_executions(cumulus_collection_doc, self.__cumulus_lambda_prefix)
LOGGER.debug(f'execution list result: {result}')
if len(result['results']) > 0:
self.__delete_collection_execution(cumulus_collection_doc, deletion_result)
return {
'statusCode': 409,
'body': {
'message': f'There are cumulus executions for this collection. Deleting them. Pls try again in a few minutes.',
}
}
# self.__delete_collection_execution(cumulus_collection_doc, deletion_result)
self.__delete_collection_rule(cumulus_collection_doc, deletion_result)
delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version'])
delete_err, delete_result = self.analyze_cumulus_result(delete_result)
if delete_err is not None:
LOGGER.error(f'deleting collection ends in error. Trying again. {delete_err}')
# self.__delete_collection_execution(cumulus_collection_doc, deletion_result)
self.__delete_collection_rule(cumulus_collection_doc, deletion_result)
delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version'])
delete_err, delete_result = self.analyze_cumulus_result(delete_result)
deletion_result['cumulus_collection_deletion'] = delete_err if delete_err is not None else delete_result
else:
deletion_result['cumulus_executions_deletion'] = 'NA'
deletion_result['cumulus_rule_deletion'] = 'NA'
deletion_result['cumulus_collection_deletion'] = 'NA'

Expand All @@ -222,23 +191,45 @@ def delete(self):
}
}

def __delete_collection_rule(self, cumulus_collection_doc, deletion_result):
if 'cumulus_rule_deletion' in deletion_result and 'statusCode' not in deletion_result['cumulus_rule_deletion']:
return
rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules(cumulus_collection_doc, self.__cumulus_lambda_prefix)
rule_delete_err, rule_delete_result = self.analyze_cumulus_result(rule_deletion_result)
deletion_result['cumulus_rule_deletion'] = rule_delete_err if rule_delete_err is not None else rule_delete_result
return

def __delete_collection_execution(self, cumulus_collection_doc, deletion_result):
executions_delete_result = self.__cumulus_collection_query.delete_executions(cumulus_collection_doc, self.__cumulus_lambda_prefix)
exec_delete_err, exec_delete_result = self.analyze_cumulus_result(executions_delete_result)
deletion_result['cumulus_executions_deletion'] = exec_delete_err if exec_delete_err is not None else exec_delete_result
sleep(10)
return
def create(self):
try:
cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body)
self.__provider_id = self.__provider_id if self.__collection_transformer.output_provider is None else self.__collection_transformer.output_provider
LOGGER.debug(f'__provider_id: {self.__provider_id}')
creation_result = 'NA'
if self.__include_cumulus:
creation_err, creation_result = self.__create_collection_cumulus(cumulus_collection_doc)
creation_cumulus_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix)
creation_err, creation_result = self.analyze_cumulus_result(creation_cumulus_result)
if creation_err is not None:
return creation_err
uds_creation_result = self.__create_collection_uds(cumulus_collection_doc)
if uds_creation_result is not None:
return uds_creation_result
if self.__include_cumulus:
create_rule_result = self.__create_rules_cumulus(cumulus_collection_doc)
if create_rule_result is not None:
return create_rule_result
rule_creation_result = self.__cumulus_collection_query.create_sqs_rules(
cumulus_collection_doc,
self.__cumulus_lambda_prefix,
self.__ingest_sqs_url,
self.__provider_id,
self.__workflow_name,
)
create_rule_err, create_rule_result = self.analyze_cumulus_result(rule_creation_result)
if create_rule_err is not None:
return create_rule_err
# validation_result = pystac.Collection.from_dict(self.__request_body).validate()
# cumulus_collection_query = CollectionsQuery('', '')
#
Expand Down
Loading