Skip to content

Commit be1b163

Browse files
authored
Merge 8f706be into 9365112
2 parents 9365112 + 8f706be commit be1b163

File tree

2 files changed

+116
-63
lines changed

2 files changed

+116
-63
lines changed

cumulus_lambda_functions/cumulus_wrapper/query_collections.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def with_collections(self, collection_ids: list):
3434
collection_names = [k.split('___')[0] for k in collection_ids]
3535
self._conditions.append(f'{self.__collection_name}__in={",".join(collection_names)}')
3636
return self
37+
3738
def get_size(self, private_api_prefix: str):
3839
query_params = {'field': 'status', 'type': 'collections'}
3940
main_conditions = {k[0]: k[1] for k in [k1.split('=') for k1 in self._conditions]}
@@ -191,6 +192,75 @@ def delete_sqs_rules(self, new_collection: dict, private_api_prefix: str):
191192
return {'server_error': f'error while invoking:{str(e)}'}
192193
return {'status': query_result['message']}
193194

195+
def delete_executions(self, new_collection: dict, private_api_prefix: str):
196+
# $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken'
197+
request_body = {
198+
"collectionId": f'{new_collection["name"]}___{new_collection["version"]}',
199+
"esBatchSize": 10000,
200+
"dbBatchSize": 50000
201+
}
202+
payload = {
203+
'httpMethod': 'POST',
204+
'resource': '/{proxy+}',
205+
'path': f'/executions/bulk-delete-by-collection',
206+
'headers': {
207+
'Content-Type': 'application/json',
208+
},
209+
'body': json.dumps(request_body)
210+
}
211+
LOGGER.debug(f'payload: {payload}')
212+
try:
213+
query_result = self._invoke_api(payload, private_api_prefix)
214+
"""
215+
{'statusCode': 500, 'body': '', 'headers': {}}
216+
"""
217+
if query_result['statusCode'] >= 500:
218+
LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}')
219+
return {'server_error': query_result}
220+
if query_result['statusCode'] >= 400:
221+
LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}')
222+
return {'client_error': query_result}
223+
query_result = json.loads(query_result['body'])
224+
LOGGER.debug(f'json query_result: {query_result}')
225+
if 'id' not in query_result:
226+
return {'server_error': f'invalid response: {query_result}'}
227+
except Exception as e:
228+
LOGGER.exception('error while invoking')
229+
return {'server_error': f'error while invoking:{str(e)}'}
230+
return {'status': query_result}
231+
232+
def list_executions(self, new_collection: dict, private_api_prefix: str):
233+
# $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken'
234+
payload = {
235+
'httpMethod': 'GET',
236+
'resource': '/{proxy+}',
237+
'path': f'/executions',
238+
'queryStringParameters': {'limit': '100', 'collectionId': f'{new_collection["name"]}___{new_collection["version"]}'},
239+
'headers': {
240+
'Content-Type': 'application/json',
241+
}
242+
}
243+
LOGGER.debug(f'payload: {payload}')
244+
try:
245+
query_result = self._invoke_api(payload, private_api_prefix)
246+
"""
247+
{'statusCode': 500, 'body': '', 'headers': {}}
248+
"""
249+
if query_result['statusCode'] >= 500:
250+
LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}')
251+
return {'server_error': query_result}
252+
if query_result['statusCode'] >= 400:
253+
LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}')
254+
return {'client_error': query_result}
255+
query_result = json.loads(query_result['body'])
256+
LOGGER.debug(f'json query_result: {query_result}')
257+
if 'id' not in query_result:
258+
return {'server_error': f'invalid response: {query_result}'}
259+
except Exception as e:
260+
LOGGER.exception('error while invoking')
261+
return {'server_error': f'error while invoking:{str(e)}'}
262+
return {'status': query_result}
263+
194264
def create_sqs_rules(self, new_collection: dict, private_api_prefix: str, sqs_url: str, provider_name: str = '', workflow_name: str = 'CatalogGranule', visibility_timeout: int = 1800):
195265
"""
196266
curl --request POST "$CUMULUS_BASEURL/rules" --header "Authorization: Bearer $cumulus_token" --header 'Content-Type: application/json' --data '{

cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py

Lines changed: 46 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import os
3+
from time import sleep
34
from typing import Optional
45

56
import pystac
@@ -81,68 +82,17 @@ def __init__(self, request_body):
8182
self.__uds_collection = UdsCollections(es_url=os.getenv('ES_URL'), es_port=int(os.getenv('ES_PORT', '443')), es_type=os.getenv('ES_TYPE', 'AWS'), use_ssl=os.getenv('ES_USE_SSL', 'TRUE').strip() is True)
8283
self.__cumulus_collection_query = CollectionsQuery('', '')
8384

84-
def __delete_collection_cumulus(self, cumulus_collection_doc):
85-
delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version'])
86-
if 'status' not in delete_result:
87-
LOGGER.error(f'status not in creation_result: {delete_result}')
85+
def analyze_cumulus_result(self, cumulus_request_result):
86+
if 'status' not in cumulus_request_result:
87+
LOGGER.error(f'status not in cumulus_request_result: {cumulus_request_result}')
8888
return {
8989
'statusCode': 500,
9090
'body': {
91-
'message': delete_result
91+
'message': cumulus_request_result
9292
}
9393
}, None
94-
return None, delete_result
94+
return None, cumulus_request_result
9595

96-
def __create_collection_cumulus(self, cumulus_collection_doc):
97-
creation_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix)
98-
if 'status' not in creation_result:
99-
LOGGER.error(f'status not in creation_result: {creation_result}')
100-
return {
101-
'statusCode': 500,
102-
'body': {
103-
'message': creation_result
104-
}
105-
}, None
106-
return None, creation_result
107-
108-
def __create_rules_cumulus(self, cumulus_collection_doc):
109-
rule_creation_result = self.__cumulus_collection_query.create_sqs_rules(
110-
cumulus_collection_doc,
111-
self.__cumulus_lambda_prefix,
112-
self.__ingest_sqs_url,
113-
self.__provider_id,
114-
self.__workflow_name,
115-
)
116-
if 'status' not in rule_creation_result:
117-
LOGGER.error(f'status not in rule_creation_result. deleting collection: {rule_creation_result}')
118-
delete_collection_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix,
119-
cumulus_collection_doc['name'],
120-
cumulus_collection_doc['version'])
121-
self.__uds_collection.delete_collection(self.__collection_transformer.get_collection_id())
122-
return {
123-
'statusCode': 500,
124-
'body': {
125-
'message': rule_creation_result,
126-
'details': f'collection deletion result: {delete_collection_result}'
127-
}
128-
}
129-
return None
130-
131-
def __delete_rules_cumulus(self, cumulus_collection_doc):
132-
rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules(
133-
cumulus_collection_doc,
134-
self.__cumulus_lambda_prefix
135-
)
136-
if 'status' not in rule_deletion_result:
137-
LOGGER.error(f'status not in rule_creation_result. deleting collection: {rule_deletion_result}')
138-
return {
139-
'statusCode': 500,
140-
'body': {
141-
'message': rule_deletion_result,
142-
'details': f'collection deletion result: {rule_deletion_result}'
143-
}
144-
}
145-
return None
14696

14797
def __delete_collection_uds(self):
14898
try:
@@ -189,17 +139,28 @@ def __create_collection_uds(self, cumulus_collection_doc):
189139
def delete(self):
190140
deletion_result = {}
191141
try:
142+
192143
cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body)
193144
self.__provider_id = self.__provider_id if self.__collection_transformer.output_provider is None else self.__collection_transformer.output_provider
194145
LOGGER.debug(f'__provider_id: {self.__provider_id}')
195146
creation_result = 'NA'
196147

197148
if self.__include_cumulus:
198-
rules_deletion_result = self.__delete_rules_cumulus(cumulus_collection_doc)
199-
deletion_result['cumulus_rule_deletion'] = rules_deletion_result if rules_deletion_result is not None else 'succeeded'
200-
delete_err, delete_result = self.__delete_collection_cumulus(cumulus_collection_doc)
149+
result = self.__cumulus_collection_query.delete_executions(cumulus_collection_doc, self.__cumulus_lambda_prefix)
150+
print(f'execution list result: {result}')
151+
self.__delete_collection_execution(cumulus_collection_doc, deletion_result)
152+
self.__delete_collection_rule(cumulus_collection_doc, deletion_result)
153+
delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version'])
154+
delete_err, delete_result = self.analyze_cumulus_result(delete_result)
155+
if delete_err is not None:
156+
LOGGER.error(f'deleting collection ends in error. Trying again. {delete_err}')
157+
self.__delete_collection_execution(cumulus_collection_doc, deletion_result)
158+
self.__delete_collection_rule(cumulus_collection_doc, deletion_result)
159+
delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version'])
160+
delete_err, delete_result = self.analyze_cumulus_result(delete_result)
201161
deletion_result['cumulus_collection_deletion'] = delete_err if delete_err is not None else delete_result
202162
else:
163+
deletion_result['cumulus_executions_deletion'] = 'NA'
203164
deletion_result['cumulus_rule_deletion'] = 'NA'
204165
deletion_result['cumulus_collection_deletion'] = 'NA'
205166

@@ -222,23 +183,45 @@ def delete(self):
222183
}
223184
}
224185

186+
def __delete_collection_rule(self, cumulus_collection_doc, deletion_result):
187+
if 'cumulus_rule_deletion' in deletion_result and 'statusCode' not in deletion_result['cumulus_rule_deletion']:
188+
return
189+
rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules(cumulus_collection_doc, self.__cumulus_lambda_prefix)
190+
rule_delete_err, rule_delete_result = self.analyze_cumulus_result(rule_deletion_result)
191+
deletion_result['cumulus_rule_deletion'] = rule_delete_err if rule_delete_err is not None else rule_delete_result
192+
return
193+
194+
def __delete_collection_execution(self, cumulus_collection_doc, deletion_result):
195+
executions_delete_result = self.__cumulus_collection_query.delete_executions(cumulus_collection_doc, self.__cumulus_lambda_prefix)
196+
exec_delete_err, exec_delete_result = self.analyze_cumulus_result(executions_delete_result)
197+
deletion_result['cumulus_executions_deletion'] = exec_delete_err if exec_delete_err is not None else exec_delete_result
198+
sleep(10)
199+
return
225200
def create(self):
226201
try:
227202
cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body)
228203
self.__provider_id = self.__provider_id if self.__collection_transformer.output_provider is None else self.__collection_transformer.output_provider
229204
LOGGER.debug(f'__provider_id: {self.__provider_id}')
230205
creation_result = 'NA'
231206
if self.__include_cumulus:
232-
creation_err, creation_result = self.__create_collection_cumulus(cumulus_collection_doc)
207+
creation_cumulus_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix)
208+
creation_err, creation_result = self.analyze_cumulus_result(creation_cumulus_result)
233209
if creation_err is not None:
234210
return creation_err
235211
uds_creation_result = self.__create_collection_uds(cumulus_collection_doc)
236212
if uds_creation_result is not None:
237213
return uds_creation_result
238214
if self.__include_cumulus:
239-
create_rule_result = self.__create_rules_cumulus(cumulus_collection_doc)
240-
if create_rule_result is not None:
241-
return create_rule_result
215+
rule_creation_result = self.__cumulus_collection_query.create_sqs_rules(
216+
cumulus_collection_doc,
217+
self.__cumulus_lambda_prefix,
218+
self.__ingest_sqs_url,
219+
self.__provider_id,
220+
self.__workflow_name,
221+
)
222+
create_rule_err, create_rule_result = self.analyze_cumulus_result(rule_creation_result)
223+
if create_rule_err is not None:
224+
return create_rule_err
242225
# validation_result = pystac.Collection.from_dict(self.__request_body).validate()
243226
# cumulus_collection_query = CollectionsQuery('', '')
244227
#

0 commit comments

Comments
 (0)