Skip to content

feat: Empty collection delete #591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions cumulus_lambda_functions/cumulus_wrapper/query_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,39 @@ def query_rules(self, private_api_prefix: str):
return {'server_error': f'error while invoking:{str(e)}'}
return {'results': query_result}

def delete_sqs_rules(self, new_collection: dict, private_api_prefix: str):
# $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken'
underscore_collection_name = re.sub(r'[^a-zA-Z0-9_]', '___', new_collection["name"]) # replace any character that's not alphanumeric or underscore with 3 underscores
rule_name = f'{underscore_collection_name}___{new_collection["version"]}___rules_sqs'
payload = {
'httpMethod': 'DELETE',
'resource': '/{proxy+}',
'path': f'/{self.__rules_key}/{rule_name}',
'headers': {
'Content-Type': 'application/json',
},
}
LOGGER.debug(f'payload: {payload}')
try:
query_result = self._invoke_api(payload, private_api_prefix)
"""
{'statusCode': 500, 'body': '', 'headers': {}}
"""
if query_result['statusCode'] >= 500:
LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}')
return {'server_error': query_result}
if query_result['statusCode'] >= 400:
LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}')
return {'client_error': query_result}
query_result = json.loads(query_result['body'])
LOGGER.debug(f'json query_result: {query_result}')
if 'message' not in query_result:
return {'server_error': f'invalid response: {query_result}'}
except Exception as e:
LOGGER.exception('error while invoking')
return {'server_error': f'error while invoking:{str(e)}'}
return {'status': query_result['message']}

def create_sqs_rules(self, new_collection: dict, private_api_prefix: str, sqs_url: str, provider_name: str = '', workflow_name: str = 'CatalogGranule', visibility_timeout: int = 1800):
"""
curl --request POST "$CUMULUS_BASEURL/rules" --header "Authorization: Bearer $cumulus_token" --header 'Content-Type: application/json' --data '{
Expand Down
11 changes: 11 additions & 0 deletions cumulus_lambda_functions/lib/uds_db/granules_db_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,17 @@ def add_entry(self, tenant: str, tenant_venue: str, json_body: dict, doc_id: str
# TODO validate custom metadata vs the latest index to filter extra items
return

def get_size(self, tenant: str, tenant_venue: str, collection_id: str):
read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
search_dsl = {
'query': {'bool': {'must': [{
'term': {'collection': collection_id}
}]}},
'size': 0
}
search_result = self.__es.query(search_dsl, querying_index=read_alias_name)
return self.__es.get_result_size(search_result)

def dsl_search(self, tenant: str, tenant_venue: str, search_dsl: dict):
read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
if 'sort' not in search_dsl: # We cannot paginate w/o sort. So, max is 10k items:
Expand Down
70 changes: 69 additions & 1 deletion cumulus_lambda_functions/uds_api/collections_api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import json
import os
from datetime import datetime
from typing import Union

from pystac import Catalog, Link
from pystac import Catalog, Link, Collection, Extent, SpatialExtent, TemporalExtent, Summaries, Provider

from cumulus_lambda_functions.lib.uds_db.db_constants import DBConstants
from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex

from cumulus_lambda_functions.lib.uds_db.uds_collections import UdsCollections

Expand Down Expand Up @@ -276,3 +278,69 @@ async def query_collections(request: Request, collection_id: Union[str, None] =
if collections_result['statusCode'] == 200:
return collections_result['body']
raise HTTPException(status_code=collections_result['statusCode'], detail=collections_result['body'])

@router.delete("/{collection_id}")
@router.delete("/{collection_id}/")
async def delete_single_collection(request: Request, collection_id: str):
LOGGER.debug(f'starting delete_single_collection: {collection_id}')
LOGGER.debug(f'starting delete_single_collection request: {request}')

authorizer: UDSAuthorizorAbstract = UDSAuthorizerFactory() \
.get_instance(UDSAuthorizerFactory.cognito,
es_url=os.getenv('ES_URL'),
es_port=int(os.getenv('ES_PORT', '443'))
)
auth_info = FastApiUtils.get_authorization_info(request)
uds_collections = UdsCollections(es_url=os.getenv('ES_URL'),
es_port=int(os.getenv('ES_PORT', '443')), es_type=os.getenv('ES_TYPE', 'AWS'))
if collection_id is None or collection_id == '':
raise HTTPException(status_code=500, detail=f'missing or invalid collection_id: {collection_id}')
collection_identifier = uds_collections.decode_identifier(collection_id)
if not authorizer.is_authorized_for_collection(DBConstants.delete, collection_id,
auth_info['ldap_groups'],
collection_identifier.tenant,
collection_identifier.venue):
LOGGER.debug(f'user: {auth_info["username"]} is not authorized for {collection_id}')
raise HTTPException(status_code=403, detail=json.dumps({
'message': 'not authorized to execute this action'
}))

granules_count = GranulesDbIndex().get_size(collection_identifier.tenant, collection_identifier.venue,
collection_id)
LOGGER.debug(f'granules_count: {granules_count} for {collection_id}')
if granules_count > 0:
LOGGER.debug(f'NOT deleting {collection_id} as it is not empty')
raise HTTPException(status_code=409, detail=f'NOT deleting {collection_id} as it is not empty')

try:
new_collection = Collection(
id=collection_id,
title=collection_id,
description='TODO',
extent = Extent(
SpatialExtent([[0.0, 0.0, 0.0, 0.0]]),
TemporalExtent([[datetime.utcnow(), datetime.utcnow()]])
),
license = "proprietary",
providers = [],
# title=input_collection['LongName'],
# keywords=[input_collection['SpatialKeywords']['Keyword']],
summaries = Summaries({
"totalGranules": [-1],
}),
)
new_collection.links = [
Link(rel='root',
target=f'./collection.json',
media_type='application/json', title=f"{new_collection.id}"),
Link(rel='item',
target='./collection.json',
media_type='application/json', title=f"{new_collection.id} Granules")
]
creation_result = CollectionDapaCreation(new_collection.to_dict(False, False)).delete()
except Exception as e:
LOGGER.exception('failed during ingest_cnm_dapa')
raise HTTPException(status_code=500, detail=str(e))
if creation_result['statusCode'] < 300:
return creation_result['body'], creation_result['statusCode']
raise HTTPException(status_code=creation_result['statusCode'], detail=creation_result['body'])
79 changes: 79 additions & 0 deletions cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ def __init__(self, request_body):
self.__uds_collection = UdsCollections(es_url=os.getenv('ES_URL'), es_port=int(os.getenv('ES_PORT', '443')), es_type=os.getenv('ES_TYPE', 'AWS'), use_ssl=os.getenv('ES_USE_SSL', 'TRUE').strip() is True)
self.__cumulus_collection_query = CollectionsQuery('', '')

def __delete_collection_cumulus(self, cumulus_collection_doc):
delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version'])
if 'status' not in delete_result:
LOGGER.error(f'status not in creation_result: {delete_result}')
return {
'statusCode': 500,
'body': {
'message': delete_result
}
}, None
return None, delete_result

def __create_collection_cumulus(self, cumulus_collection_doc):
creation_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix)
if 'status' not in creation_result:
Expand Down Expand Up @@ -116,6 +128,37 @@ def __create_rules_cumulus(self, cumulus_collection_doc):
}
return None

def __delete_rules_cumulus(self, cumulus_collection_doc):
rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules(
cumulus_collection_doc,
self.__cumulus_lambda_prefix
)
if 'status' not in rule_deletion_result:
LOGGER.error(f'status not in rule_creation_result. deleting collection: {rule_deletion_result}')
return {
'statusCode': 500,
'body': {
'message': rule_deletion_result,
'details': f'collection deletion result: {rule_deletion_result}'
}
}
return None

def __delete_collection_uds(self):
try:
delete_collection_result = self.__uds_collection.delete_collection(
collection_id=self.__collection_transformer.get_collection_id()
)
except Exception as e:
LOGGER.exception(f'failed to add collection to Elasticsearch')
return {
'statusCode': 500,
'body': {
'message': f'unable to delete collection to Elasticsearch: {str(e)}',
}
}
return None

def __create_collection_uds(self, cumulus_collection_doc):

try:
Expand Down Expand Up @@ -143,6 +186,42 @@ def __create_collection_uds(self, cumulus_collection_doc):
}
return None

def delete(self):
deletion_result = {}
try:
cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body)
self.__provider_id = self.__provider_id if self.__collection_transformer.output_provider is None else self.__collection_transformer.output_provider
LOGGER.debug(f'__provider_id: {self.__provider_id}')
creation_result = 'NA'

if self.__include_cumulus:
rules_deletion_result = self.__delete_rules_cumulus(cumulus_collection_doc)
deletion_result['cumulus_rule_deletion'] = rules_deletion_result if rules_deletion_result is not None else 'succeeded'
delete_err, delete_result = self.__delete_collection_cumulus(cumulus_collection_doc)
deletion_result['cumulus_collection_deletion'] = delete_err if delete_err is not None else delete_result
else:
deletion_result['cumulus_rule_deletion'] = 'NA'
deletion_result['cumulus_collection_deletion'] = 'NA'

uds_deletion_result = self.__delete_collection_uds()
deletion_result['uds_collection_deletion'] = uds_deletion_result if uds_deletion_result is not None else 'succeeded'
except Exception as e:
LOGGER.exception('error while creating new collection in Cumulus')
return {
'statusCode': 500,
'body': {
'message': f'error while creating new collection in Cumulus. check details',
'details': str(e)
}
}
LOGGER.info(f'creation_result: {creation_result}')
return {
'statusCode': 200,
'body': {
'message': deletion_result
}
}

def create(self):
try:
cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body)
Expand Down
Loading