Skip to content

release/9.15.1 #600

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [9.15.1] - 2025-06-25
### Fixed
- [#597](https://github.com/unity-sds/unity-data-services/pull/597) fix: collection deletion - adding cumulus execution deletions

## [9.15.0] - 2025-06-03
### Changed
- [#591](https://github.com/unity-sds/unity-data-services/pull/591) feat: empty collection delete

## [9.14.0] - 2025-06-02
### Changed
- [#593](https://github.com/unity-sds/unity-data-services/pull/593) feat: add daac_provider

## [9.13.0] - 2025-05-29
### Changed
- [#589](https://github.com/unity-sds/unity-data-services/pull/589) feat: daac product.name = granule id

## [9.12.0] - 2025-05-24
### Changed
- [#585](https://github.com/unity-sds/unity-data-services/pull/585) feat: add ram size in lambdas
Expand Down
103 changes: 103 additions & 0 deletions cumulus_lambda_functions/cumulus_wrapper/query_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def with_collections(self, collection_ids: list):
collection_names = [k.split('___')[0] for k in collection_ids]
self._conditions.append(f'{self.__collection_name}__in={",".join(collection_names)}')
return self

def get_size(self, private_api_prefix: str):
query_params = {'field': 'status', 'type': 'collections'}
main_conditions = {k[0]: k[1] for k in [k1.split('=') for k1 in self._conditions]}
Expand Down Expand Up @@ -158,6 +159,108 @@ def query_rules(self, private_api_prefix: str):
return {'server_error': f'error while invoking:{str(e)}'}
return {'results': query_result}

def delete_sqs_rules(self, new_collection: dict, private_api_prefix: str):
# $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken'
underscore_collection_name = re.sub(r'[^a-zA-Z0-9_]', '___', new_collection["name"]) # replace any character that's not alphanumeric or underscore with 3 underscores
rule_name = f'{underscore_collection_name}___{new_collection["version"]}___rules_sqs'
payload = {
'httpMethod': 'DELETE',
'resource': '/{proxy+}',
'path': f'/{self.__rules_key}/{rule_name}',
'headers': {
'Content-Type': 'application/json',
},
}
LOGGER.debug(f'payload: {payload}')
try:
query_result = self._invoke_api(payload, private_api_prefix)
"""
{'statusCode': 500, 'body': '', 'headers': {}}
"""
if query_result['statusCode'] >= 500:
LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}')
return {'server_error': query_result}
if query_result['statusCode'] >= 400:
LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}')
return {'client_error': query_result}
query_result = json.loads(query_result['body'])
LOGGER.debug(f'json query_result: {query_result}')
if 'message' not in query_result:
return {'server_error': f'invalid response: {query_result}'}
except Exception as e:
LOGGER.exception('error while invoking')
return {'server_error': f'error while invoking:{str(e)}'}
return {'status': query_result['message']}

def delete_executions(self, new_collection: dict, private_api_prefix: str):
# $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken'
request_body = {
"collectionId": f'{new_collection["name"]}___{new_collection["version"]}',
"esBatchSize": 10000,
"dbBatchSize": 50000
}
payload = {
'httpMethod': 'POST',
'resource': '/{proxy+}',
'path': f'/executions/bulk-delete-by-collection',
'headers': {
'Content-Type': 'application/json',
},
'body': json.dumps(request_body)
}
LOGGER.debug(f'payload: {payload}')
try:
query_result = self._invoke_api(payload, private_api_prefix)
"""
{'statusCode': 500, 'body': '', 'headers': {}}
"""
if query_result['statusCode'] >= 500:
LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}')
return {'server_error': query_result}
if query_result['statusCode'] >= 400:
LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}')
return {'client_error': query_result}
query_result = json.loads(query_result['body'])
LOGGER.debug(f'json query_result: {query_result}')
if 'id' not in query_result:
return {'server_error': f'invalid response: {query_result}'}
except Exception as e:
LOGGER.exception('error while invoking')
return {'server_error': f'error while invoking:{str(e)}'}
return {'status': query_result}

def list_executions(self, new_collection: dict, private_api_prefix: str):
# $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken'
payload = {
'httpMethod': 'GET',
'resource': '/{proxy+}',
'path': f'/executions',
'queryStringParameters': {'limit': '100', 'collectionId': f'{new_collection["name"]}___{new_collection["version"]}'},
'headers': {
'Content-Type': 'application/json',
}
}
LOGGER.debug(f'payload: {payload}')
try:
query_result = self._invoke_api(payload, private_api_prefix)
"""
{'statusCode': 500, 'body': '', 'headers': {}}
"""
if query_result['statusCode'] >= 500:
LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}')
return {'server_error': query_result}
if query_result['statusCode'] >= 400:
LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}')
return {'client_error': query_result}
query_result = json.loads(query_result['body'])
LOGGER.debug(f'json query_result: {query_result}')
if 'results' not in query_result:
return {'server_error': f'invalid response: {query_result}'}
except Exception as e:
LOGGER.exception('error while invoking')
return {'server_error': f'error while invoking:{str(e)}'}
return {'results': query_result['results']}

def create_sqs_rules(self, new_collection: dict, private_api_prefix: str, sqs_url: str, provider_name: str = '', workflow_name: str = 'CatalogGranule', visibility_timeout: int = 1800):
"""
curl --request POST "$CUMULUS_BASEURL/rules" --header "Authorization: Bearer $cumulus_token" --header 'Content-Type: application/json' --data '{
Expand Down
6 changes: 3 additions & 3 deletions cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict):

def send_to_daac_internal(self, uds_cnm_json: dict):
LOGGER.debug(f'uds_cnm_json: {uds_cnm_json}')
granule_identifier = UdsCollections.decode_identifier(uds_cnm_json['identifier']) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this.
granule_identifier = UdsCollections.decode_granule_identifier(uds_cnm_json['identifier']) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this.
self.__archive_index_logic.set_tenant_venue(granule_identifier.tenant, granule_identifier.venue)
daac_config = self.__archive_index_logic.percolate_document(uds_cnm_json['identifier'])
if daac_config is None or len(daac_config) < 1:
Expand All @@ -117,10 +117,10 @@ def send_to_daac_internal(self, uds_cnm_json: dict):
},
"identifier": uds_cnm_json['identifier'],
"submissionTime": f'{TimeUtils.get_current_time()}Z',
"provider": granule_identifier.tenant,
"provider": daac_config['daac_provider'] if 'daac_provider' in daac_config else granule_identifier.tenant,
"version": "1.6.0", # TODO this is hardcoded?
"product": {
"name": granule_identifier.id,
"name": granule_identifier.granule,
# "dataVersion": daac_config['daac_data_version'],
'files': self.__extract_files(uds_cnm_json, daac_config),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ class GranulesIndexMapping:
"daac_data_version": {
"type": "keyword"
},
"daac_provider": {
"type": "keyword"
},
"daac_role_arn": {
"type": "keyword"
},
Expand Down
2 changes: 2 additions & 0 deletions cumulus_lambda_functions/lib/uds_db/archive_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class UdsArchiveConfigIndex:
'collection', 'ss_username', 'archiving_types'],
'properties': {
'daac_collection_id': {'type': 'string'},
'daac_provider': {'type': 'string'},
'daac_sns_topic_arn': {'type': 'string'},
'daac_data_version': {'type': 'string'},
'daac_role_arn': {'type': 'string'},
Expand All @@ -36,6 +37,7 @@ class UdsArchiveConfigIndex:
'collection', 'ss_username', 'archiving_types'],
'properties': {
'daac_collection_name': {'type': 'string'},
'daac_provider': {'type': 'string'},
'daac_sns_topic_arn': {'type': 'string'},
'daac_data_version': {'type': 'string'},
'daac_role_arn': {'type': 'string'},
Expand Down
11 changes: 11 additions & 0 deletions cumulus_lambda_functions/lib/uds_db/granules_db_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,17 @@ def add_entry(self, tenant: str, tenant_venue: str, json_body: dict, doc_id: str
# TODO validate custom metadata vs the latest index to filter extra items
return

def get_size(self, tenant: str, tenant_venue: str, collection_id: str):
read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
search_dsl = {
'query': {'bool': {'must': [{
'term': {'collection': collection_id}
}]}},
'size': 0
}
search_result = self.__es.query(search_dsl, querying_index=read_alias_name)
return self.__es.get_result_size(search_result)

def dsl_search(self, tenant: str, tenant_venue: str, search_dsl: dict):
read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
if 'sort' not in search_dsl: # We cannot paginate w/o sort. So, max is 10k items:
Expand Down
9 changes: 9 additions & 0 deletions cumulus_lambda_functions/lib/uds_db/uds_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@


CollectionIdentifier = namedtuple('CollectionIdentifier', ['urn', 'nasa', 'project', 'tenant', 'venue', 'id'])
GranuleIdentifier = namedtuple('CollectionIdentifier', ['urn', 'nasa', 'project', 'tenant', 'venue', 'id', 'granule'])


class UdsCollections:
Expand All @@ -35,6 +36,14 @@ def decode_identifier(incoming_identifier: str) -> CollectionIdentifier:
raise ValueError(f'invalid collection: {collection_identifier_parts}')
return CollectionIdentifier._make(collection_identifier_parts[0:6])

@staticmethod
def decode_granule_identifier(incoming_identifier: str) -> GranuleIdentifier:
collection_identifier_parts = incoming_identifier.split(':')
if len(collection_identifier_parts) < 7:
raise ValueError(f'invalid collection: {collection_identifier_parts}')
return GranuleIdentifier._make(collection_identifier_parts[0:6] + [':'.join(collection_identifier_parts[6:])])


def __bbox_to_polygon(self, bbox: list):
if len(bbox) != 4:
raise ValueError(f'not bounding box: {bbox}')
Expand Down
70 changes: 69 additions & 1 deletion cumulus_lambda_functions/uds_api/collections_api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import json
import os
from datetime import datetime
from typing import Union

from pystac import Catalog, Link
from pystac import Catalog, Link, Collection, Extent, SpatialExtent, TemporalExtent, Summaries, Provider

from cumulus_lambda_functions.lib.uds_db.db_constants import DBConstants
from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex

from cumulus_lambda_functions.lib.uds_db.uds_collections import UdsCollections

Expand Down Expand Up @@ -276,3 +278,69 @@ async def query_collections(request: Request, collection_id: Union[str, None] =
if collections_result['statusCode'] == 200:
return collections_result['body']
raise HTTPException(status_code=collections_result['statusCode'], detail=collections_result['body'])

@router.delete("/{collection_id}")
@router.delete("/{collection_id}/")
async def delete_single_collection(request: Request, collection_id: str):
LOGGER.debug(f'starting delete_single_collection: {collection_id}')
LOGGER.debug(f'starting delete_single_collection request: {request}')

authorizer: UDSAuthorizorAbstract = UDSAuthorizerFactory() \
.get_instance(UDSAuthorizerFactory.cognito,
es_url=os.getenv('ES_URL'),
es_port=int(os.getenv('ES_PORT', '443'))
)
auth_info = FastApiUtils.get_authorization_info(request)
uds_collections = UdsCollections(es_url=os.getenv('ES_URL'),
es_port=int(os.getenv('ES_PORT', '443')), es_type=os.getenv('ES_TYPE', 'AWS'))
if collection_id is None or collection_id == '':
raise HTTPException(status_code=500, detail=f'missing or invalid collection_id: {collection_id}')
collection_identifier = uds_collections.decode_identifier(collection_id)
if not authorizer.is_authorized_for_collection(DBConstants.delete, collection_id,
auth_info['ldap_groups'],
collection_identifier.tenant,
collection_identifier.venue):
LOGGER.debug(f'user: {auth_info["username"]} is not authorized for {collection_id}')
raise HTTPException(status_code=403, detail=json.dumps({
'message': 'not authorized to execute this action'
}))

granules_count = GranulesDbIndex().get_size(collection_identifier.tenant, collection_identifier.venue,
collection_id)
LOGGER.debug(f'granules_count: {granules_count} for {collection_id}')
if granules_count > 0:
LOGGER.debug(f'NOT deleting {collection_id} as it is not empty')
raise HTTPException(status_code=409, detail=f'NOT deleting {collection_id} as it is not empty')

try:
new_collection = Collection(
id=collection_id,
title=collection_id,
description='TODO',
extent = Extent(
SpatialExtent([[0.0, 0.0, 0.0, 0.0]]),
TemporalExtent([[datetime.utcnow(), datetime.utcnow()]])
),
license = "proprietary",
providers = [],
# title=input_collection['LongName'],
# keywords=[input_collection['SpatialKeywords']['Keyword']],
summaries = Summaries({
"totalGranules": [-1],
}),
)
new_collection.links = [
Link(rel='root',
target=f'./collection.json',
media_type='application/json', title=f"{new_collection.id}"),
Link(rel='item',
target='./collection.json',
media_type='application/json', title=f"{new_collection.id} Granules")
]
creation_result = CollectionDapaCreation(new_collection.to_dict(False, False)).delete()
except Exception as e:
LOGGER.exception('failed during ingest_cnm_dapa')
raise HTTPException(status_code=500, detail=str(e))
if creation_result['statusCode'] < 300:
return creation_result['body'], creation_result['statusCode']
raise HTTPException(status_code=creation_result['statusCode'], detail=creation_result['body'])
Loading
Loading