Skip to content

Commit 440ca2f

Browse files
authored
Merge pull request #600 from unity-sds/develop
release/9.15.1
2 parents fa897c4 + 850ca46 commit 440ca2f

File tree

13 files changed

+337
-37
lines changed

13 files changed

+337
-37
lines changed

CHANGELOG.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,22 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [9.15.1] - 2025-06-25
9+
### Fixed
10+
- [#597](https://github.com/unity-sds/unity-data-services/pull/597) fix: collection deletion - adding cumulus execution deletions
11+
12+
## [9.15.0] - 2025-06-03
13+
### Changed
14+
- [#591](https://github.com/unity-sds/unity-data-services/pull/591) feat: empty collection delete
15+
16+
## [9.14.0] - 2025-06-02
17+
### Changed
18+
- [#593](https://github.com/unity-sds/unity-data-services/pull/593) feat: add daac_provider
19+
20+
## [9.13.0] - 2025-05-29
21+
### Changed
22+
- [#589](https://github.com/unity-sds/unity-data-services/pull/589) feat: daac product.name = granule id
23+
824
## [9.12.0] - 2025-05-24
925
### Changed
1026
- [#585](https://github.com/unity-sds/unity-data-services/pull/585) feat: add ram size in lambdas

cumulus_lambda_functions/cumulus_wrapper/query_collections.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def with_collections(self, collection_ids: list):
3434
collection_names = [k.split('___')[0] for k in collection_ids]
3535
self._conditions.append(f'{self.__collection_name}__in={",".join(collection_names)}')
3636
return self
37+
3738
def get_size(self, private_api_prefix: str):
3839
query_params = {'field': 'status', 'type': 'collections'}
3940
main_conditions = {k[0]: k[1] for k in [k1.split('=') for k1 in self._conditions]}
@@ -158,6 +159,108 @@ def query_rules(self, private_api_prefix: str):
158159
return {'server_error': f'error while invoking:{str(e)}'}
159160
return {'results': query_result}
160161

162+
def delete_sqs_rules(self, new_collection: dict, private_api_prefix: str):
163+
# $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken'
164+
underscore_collection_name = re.sub(r'[^a-zA-Z0-9_]', '___', new_collection["name"]) # replace any character that's not alphanumeric or underscore with 3 underscores
165+
rule_name = f'{underscore_collection_name}___{new_collection["version"]}___rules_sqs'
166+
payload = {
167+
'httpMethod': 'DELETE',
168+
'resource': '/{proxy+}',
169+
'path': f'/{self.__rules_key}/{rule_name}',
170+
'headers': {
171+
'Content-Type': 'application/json',
172+
},
173+
}
174+
LOGGER.debug(f'payload: {payload}')
175+
try:
176+
query_result = self._invoke_api(payload, private_api_prefix)
177+
"""
178+
{'statusCode': 500, 'body': '', 'headers': {}}
179+
"""
180+
if query_result['statusCode'] >= 500:
181+
LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}')
182+
return {'server_error': query_result}
183+
if query_result['statusCode'] >= 400:
184+
LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}')
185+
return {'client_error': query_result}
186+
query_result = json.loads(query_result['body'])
187+
LOGGER.debug(f'json query_result: {query_result}')
188+
if 'message' not in query_result:
189+
return {'server_error': f'invalid response: {query_result}'}
190+
except Exception as e:
191+
LOGGER.exception('error while invoking')
192+
return {'server_error': f'error while invoking:{str(e)}'}
193+
return {'status': query_result['message']}
194+
195+
def delete_executions(self, new_collection: dict, private_api_prefix: str):
196+
# $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken'
197+
request_body = {
198+
"collectionId": f'{new_collection["name"]}___{new_collection["version"]}',
199+
"esBatchSize": 10000,
200+
"dbBatchSize": 50000
201+
}
202+
payload = {
203+
'httpMethod': 'POST',
204+
'resource': '/{proxy+}',
205+
'path': f'/executions/bulk-delete-by-collection',
206+
'headers': {
207+
'Content-Type': 'application/json',
208+
},
209+
'body': json.dumps(request_body)
210+
}
211+
LOGGER.debug(f'payload: {payload}')
212+
try:
213+
query_result = self._invoke_api(payload, private_api_prefix)
214+
"""
215+
{'statusCode': 500, 'body': '', 'headers': {}}
216+
"""
217+
if query_result['statusCode'] >= 500:
218+
LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}')
219+
return {'server_error': query_result}
220+
if query_result['statusCode'] >= 400:
221+
LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}')
222+
return {'client_error': query_result}
223+
query_result = json.loads(query_result['body'])
224+
LOGGER.debug(f'json query_result: {query_result}')
225+
if 'id' not in query_result:
226+
return {'server_error': f'invalid response: {query_result}'}
227+
except Exception as e:
228+
LOGGER.exception('error while invoking')
229+
return {'server_error': f'error while invoking:{str(e)}'}
230+
return {'status': query_result}
231+
232+
def list_executions(self, new_collection: dict, private_api_prefix: str):
233+
# $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken'
234+
payload = {
235+
'httpMethod': 'GET',
236+
'resource': '/{proxy+}',
237+
'path': f'/executions',
238+
'queryStringParameters': {'limit': '100', 'collectionId': f'{new_collection["name"]}___{new_collection["version"]}'},
239+
'headers': {
240+
'Content-Type': 'application/json',
241+
}
242+
}
243+
LOGGER.debug(f'payload: {payload}')
244+
try:
245+
query_result = self._invoke_api(payload, private_api_prefix)
246+
"""
247+
{'statusCode': 500, 'body': '', 'headers': {}}
248+
"""
249+
if query_result['statusCode'] >= 500:
250+
LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}')
251+
return {'server_error': query_result}
252+
if query_result['statusCode'] >= 400:
253+
LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}')
254+
return {'client_error': query_result}
255+
query_result = json.loads(query_result['body'])
256+
LOGGER.debug(f'json query_result: {query_result}')
257+
if 'results' not in query_result:
258+
return {'server_error': f'invalid response: {query_result}'}
259+
except Exception as e:
260+
LOGGER.exception('error while invoking')
261+
return {'server_error': f'error while invoking:{str(e)}'}
262+
return {'results': query_result['results']}
263+
161264
def create_sqs_rules(self, new_collection: dict, private_api_prefix: str, sqs_url: str, provider_name: str = '', workflow_name: str = 'CatalogGranule', visibility_timeout: int = 1800):
162265
"""
163266
curl --request POST "$CUMULUS_BASEURL/rules" --header "Authorization: Bearer $cumulus_token" --header 'Content-Type: application/json' --data '{

cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
9898

9999
def send_to_daac_internal(self, uds_cnm_json: dict):
100100
LOGGER.debug(f'uds_cnm_json: {uds_cnm_json}')
101-
granule_identifier = UdsCollections.decode_identifier(uds_cnm_json['identifier']) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this.
101+
granule_identifier = UdsCollections.decode_granule_identifier(uds_cnm_json['identifier']) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this.
102102
self.__archive_index_logic.set_tenant_venue(granule_identifier.tenant, granule_identifier.venue)
103103
daac_config = self.__archive_index_logic.percolate_document(uds_cnm_json['identifier'])
104104
if daac_config is None or len(daac_config) < 1:
@@ -117,10 +117,10 @@ def send_to_daac_internal(self, uds_cnm_json: dict):
117117
},
118118
"identifier": uds_cnm_json['identifier'],
119119
"submissionTime": f'{TimeUtils.get_current_time()}Z',
120-
"provider": granule_identifier.tenant,
120+
"provider": daac_config['daac_provider'] if 'daac_provider' in daac_config else granule_identifier.tenant,
121121
"version": "1.6.0", # TODO this is hardcoded?
122122
"product": {
123-
"name": granule_identifier.id,
123+
"name": granule_identifier.granule,
124124
# "dataVersion": daac_config['daac_data_version'],
125125
'files': self.__extract_files(uds_cnm_json, daac_config),
126126
}

cumulus_lambda_functions/granules_to_es/granules_index_mapping.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ class GranulesIndexMapping:
99
"daac_data_version": {
1010
"type": "keyword"
1111
},
12+
"daac_provider": {
13+
"type": "keyword"
14+
},
1215
"daac_role_arn": {
1316
"type": "keyword"
1417
},

cumulus_lambda_functions/lib/uds_db/archive_index.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ class UdsArchiveConfigIndex:
1919
'collection', 'ss_username', 'archiving_types'],
2020
'properties': {
2121
'daac_collection_id': {'type': 'string'},
22+
'daac_provider': {'type': 'string'},
2223
'daac_sns_topic_arn': {'type': 'string'},
2324
'daac_data_version': {'type': 'string'},
2425
'daac_role_arn': {'type': 'string'},
@@ -36,6 +37,7 @@ class UdsArchiveConfigIndex:
3637
'collection', 'ss_username', 'archiving_types'],
3738
'properties': {
3839
'daac_collection_name': {'type': 'string'},
40+
'daac_provider': {'type': 'string'},
3941
'daac_sns_topic_arn': {'type': 'string'},
4042
'daac_data_version': {'type': 'string'},
4143
'daac_role_arn': {'type': 'string'},

cumulus_lambda_functions/lib/uds_db/granules_db_index.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,17 @@ def add_entry(self, tenant: str, tenant_venue: str, json_body: dict, doc_id: str
305305
# TODO validate custom metadata vs the latest index to filter extra items
306306
return
307307

308+
def get_size(self, tenant: str, tenant_venue: str, collection_id: str):
309+
read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
310+
search_dsl = {
311+
'query': {'bool': {'must': [{
312+
'term': {'collection': collection_id}
313+
}]}},
314+
'size': 0
315+
}
316+
search_result = self.__es.query(search_dsl, querying_index=read_alias_name)
317+
return self.__es.get_result_size(search_result)
318+
308319
def dsl_search(self, tenant: str, tenant_venue: str, search_dsl: dict):
309320
read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
310321
if 'sort' not in search_dsl: # We cannot paginate w/o sort. So, max is 10k items:

cumulus_lambda_functions/lib/uds_db/uds_collections.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313

1414
CollectionIdentifier = namedtuple('CollectionIdentifier', ['urn', 'nasa', 'project', 'tenant', 'venue', 'id'])
15+
GranuleIdentifier = namedtuple('CollectionIdentifier', ['urn', 'nasa', 'project', 'tenant', 'venue', 'id', 'granule'])
1516

1617

1718
class UdsCollections:
@@ -35,6 +36,14 @@ def decode_identifier(incoming_identifier: str) -> CollectionIdentifier:
3536
raise ValueError(f'invalid collection: {collection_identifier_parts}')
3637
return CollectionIdentifier._make(collection_identifier_parts[0:6])
3738

39+
@staticmethod
40+
def decode_granule_identifier(incoming_identifier: str) -> GranuleIdentifier:
41+
collection_identifier_parts = incoming_identifier.split(':')
42+
if len(collection_identifier_parts) < 7:
43+
raise ValueError(f'invalid collection: {collection_identifier_parts}')
44+
return GranuleIdentifier._make(collection_identifier_parts[0:6] + [':'.join(collection_identifier_parts[6:])])
45+
46+
3847
def __bbox_to_polygon(self, bbox: list):
3948
if len(bbox) != 4:
4049
raise ValueError(f'not bounding box: {bbox}')

cumulus_lambda_functions/uds_api/collections_api.py

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import json
22
import os
3+
from datetime import datetime
34
from typing import Union
45

5-
from pystac import Catalog, Link
6+
from pystac import Catalog, Link, Collection, Extent, SpatialExtent, TemporalExtent, Summaries, Provider
67

78
from cumulus_lambda_functions.lib.uds_db.db_constants import DBConstants
9+
from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex
810

911
from cumulus_lambda_functions.lib.uds_db.uds_collections import UdsCollections
1012

@@ -276,3 +278,69 @@ async def query_collections(request: Request, collection_id: Union[str, None] =
276278
if collections_result['statusCode'] == 200:
277279
return collections_result['body']
278280
raise HTTPException(status_code=collections_result['statusCode'], detail=collections_result['body'])
281+
282+
@router.delete("/{collection_id}")
283+
@router.delete("/{collection_id}/")
284+
async def delete_single_collection(request: Request, collection_id: str):
285+
LOGGER.debug(f'starting delete_single_collection: {collection_id}')
286+
LOGGER.debug(f'starting delete_single_collection request: {request}')
287+
288+
authorizer: UDSAuthorizorAbstract = UDSAuthorizerFactory() \
289+
.get_instance(UDSAuthorizerFactory.cognito,
290+
es_url=os.getenv('ES_URL'),
291+
es_port=int(os.getenv('ES_PORT', '443'))
292+
)
293+
auth_info = FastApiUtils.get_authorization_info(request)
294+
uds_collections = UdsCollections(es_url=os.getenv('ES_URL'),
295+
es_port=int(os.getenv('ES_PORT', '443')), es_type=os.getenv('ES_TYPE', 'AWS'))
296+
if collection_id is None or collection_id == '':
297+
raise HTTPException(status_code=500, detail=f'missing or invalid collection_id: {collection_id}')
298+
collection_identifier = uds_collections.decode_identifier(collection_id)
299+
if not authorizer.is_authorized_for_collection(DBConstants.delete, collection_id,
300+
auth_info['ldap_groups'],
301+
collection_identifier.tenant,
302+
collection_identifier.venue):
303+
LOGGER.debug(f'user: {auth_info["username"]} is not authorized for {collection_id}')
304+
raise HTTPException(status_code=403, detail=json.dumps({
305+
'message': 'not authorized to execute this action'
306+
}))
307+
308+
granules_count = GranulesDbIndex().get_size(collection_identifier.tenant, collection_identifier.venue,
309+
collection_id)
310+
LOGGER.debug(f'granules_count: {granules_count} for {collection_id}')
311+
if granules_count > 0:
312+
LOGGER.debug(f'NOT deleting {collection_id} as it is not empty')
313+
raise HTTPException(status_code=409, detail=f'NOT deleting {collection_id} as it is not empty')
314+
315+
try:
316+
new_collection = Collection(
317+
id=collection_id,
318+
title=collection_id,
319+
description='TODO',
320+
extent = Extent(
321+
SpatialExtent([[0.0, 0.0, 0.0, 0.0]]),
322+
TemporalExtent([[datetime.utcnow(), datetime.utcnow()]])
323+
),
324+
license = "proprietary",
325+
providers = [],
326+
# title=input_collection['LongName'],
327+
# keywords=[input_collection['SpatialKeywords']['Keyword']],
328+
summaries = Summaries({
329+
"totalGranules": [-1],
330+
}),
331+
)
332+
new_collection.links = [
333+
Link(rel='root',
334+
target=f'./collection.json',
335+
media_type='application/json', title=f"{new_collection.id}"),
336+
Link(rel='item',
337+
target='./collection.json',
338+
media_type='application/json', title=f"{new_collection.id} Granules")
339+
]
340+
creation_result = CollectionDapaCreation(new_collection.to_dict(False, False)).delete()
341+
except Exception as e:
342+
LOGGER.exception('failed during ingest_cnm_dapa')
343+
raise HTTPException(status_code=500, detail=str(e))
344+
if creation_result['statusCode'] < 300:
345+
return creation_result['body'], creation_result['statusCode']
346+
raise HTTPException(status_code=creation_result['statusCode'], detail=creation_result['body'])

0 commit comments

Comments
 (0)