Skip to content

Commit 46dbb10

Browse files
authored
feat: added filter keyword in granules endpoint + repeatedly checking with time boundary for cataloging result (#156)
* feat: first cut of checking cataloging granules * fix: add missing file * fix: unit test + bugfix * feat: update response json in catalog * chore: update version * chore: update documentation * chore: remove invalid MD in changelog * fix: update due to feature-collection update from other PR
1 parent a5733ee commit 46dbb10

File tree

12 files changed

+313
-9
lines changed

12 files changed

+313
-9
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [5.1.0] - 2023-06-08
9+
### Added
10+
- [#156](https://github.com/unity-sds/unity-data-services/pull/156) feat: added filter keyword in granules endpoint + repeatedly checking with time boundary for cataloging result
11+
812
## [5.0.1] - 2023-06-21
913
### Added
1014
- [#165](https://github.com/unity-sds/unity-data-services/pull/165) fix: convert all outputs into json str

cumulus_lambda_functions/cumulus_dapa_client/dapa_client.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def get_all_granules(self, collection_id='*', limit=-1, date_from='', date_to=''
8282
items_collection_shell['features'] = results[0: limit]
8383
return items_collection_shell
8484

85-
def get_granules(self, collection_id='*', limit=1000, offset=0, date_from='', date_to=''):
85+
def get_granules(self, collection_id='*', limit=1000, offset=0, date_from='', date_to='', filters=None):
8686
"""
8787
TODO: pagination. getting only 1st 1k item
8888
:param collection_id:
@@ -95,6 +95,8 @@ def get_granules(self, collection_id='*', limit=1000, offset=0, date_from='', da
9595
dapa_granules_api = f'{self.__dapa_base_api}/am-uds-dapa/collections/{collection_id}/items?limit={limit}&offset={offset}'
9696
if date_from != '' or date_to != '':
9797
dapa_granules_api = f"{dapa_granules_api}&datetime={date_from if date_from != '' else '..'}/{date_to if date_to != '' else '..'}"
98+
if filter is not None:
99+
dapa_granules_api = f'{dapa_granules_api}&filter={json.dumps(filters)}'
98100
LOGGER.debug(f'dapa_granules_api: {dapa_granules_api}')
99101
LOGGER.debug(f'getting granules for: {dapa_granules_api}')
100102
self.__get_token()

cumulus_lambda_functions/cumulus_granules_dapa/cumulus_granules_dapa.py

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import os
33

44
from cumulus_lambda_functions.cumulus_wrapper.query_granules import GranulesQuery
5+
from cumulus_lambda_functions.lib.json_validator import JsonValidator
56
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator
67
from cumulus_lambda_functions.lib.utils.lambda_api_gateway_utils import LambdaApiGatewayUtils
78

@@ -15,7 +16,8 @@ def __init__(self, event):
1516
'path': '/collections/observation/items',
1617
'httpMethod': 'GET',
1718
'headers': {'Authorization': ' Bearer asdfafweaw'}, 'multiValueHeaders': {'Authorization': [' Bearer asdfafweaw']},
18-
'queryStringParameters': {'datetime': 'asfa;lsfdjafal', 'bbox': '12,12,12,3', 'limit': '12'}, 'multiValueQueryStringParameters': {'datetime': ['asfa;lsfdjafal'], 'bbox': ['12,12,12,3'], 'limit': ['12']}, 'pathParameters': None, 'stageVariables': None, 'requestContext': {'resourceId': 'hgdxj6', 'resourcePath': '/collections/observation/items', 'httpMethod': 'GET', 'extendedRequestId': 'SSEa6F80PHcF3xg=', 'requestTime': '17/May/2022:18:20:40 +0000', 'path': '/collections/observation/items', 'accountId': '884500545225', 'protocol': 'HTTP/1.1', 'stage': 'test-invoke-stage', 'domainPrefix': 'testPrefix', 'requestTimeEpoch': 1652811640832, 'requestId': '703f404d-cb95-43d3-8f48-523e5b1860e4', 'identity': {'cognitoIdentityPoolId': None, 'cognitoIdentityId': None, 'apiKey': 'test-invoke-api-key', 'principalOrgId': None, 'cognitoAuthenticationType': None, 'userArn': 'arn:aws:sts::884500545225:assumed-role/power_user/wai.phyo@jpl.nasa.gov', 'apiKeyId': 'test-invoke-api-key-id', 'userAgent': 'aws-internal/3 aws-sdk-java/1.12.201 Linux/5.4.181-109.354.amzn2int.x86_64 OpenJDK_64-Bit_Server_VM/25.322-b06 java/1.8.0_322 vendor/Oracle_Corporation cfg/retry-mode/standard', 'accountId': '884500545225', 'caller': 'AROAJZL4DI6MUSHCBBHGM:wai.phyo@jpl.nasa.gov', 'sourceIp': 'test-invoke-source-ip', 'accessKey': 'ASIA434CXH3EV56T6AS5', 'cognitoAuthenticationProvider': None, 'user': 'AROAJZL4DI6MUSHCBBHGM:wai.phyo@jpl.nasa.gov'}, 'domainName': 'testPrefix.testDomainName', 'apiId': 'gwaxi7ijl4'}, 'body': None, 'isBase64Encoded': False}
19+
'queryStringParameters': {'datetime': 'asfa;lsfdjafal', 'bbox': '12,12,12,3', 'limit': '12'},
20+
'multiValueQueryStringParameters': {'datetime': ['asfa;lsfdjafal'], 'bbox': ['12,12,12,3'], 'limit': ['12']}, 'pathParameters': None, 'stageVariables': None, 'requestContext': {'resourceId': 'hgdxj6', 'resourcePath': '/collections/observation/items', 'httpMethod': 'GET', 'extendedRequestId': 'SSEa6F80PHcF3xg=', 'requestTime': '17/May/2022:18:20:40 +0000', 'path': '/collections/observation/items', 'accountId': '884500545225', 'protocol': 'HTTP/1.1', 'stage': 'test-invoke-stage', 'domainPrefix': 'testPrefix', 'requestTimeEpoch': 1652811640832, 'requestId': '703f404d-cb95-43d3-8f48-523e5b1860e4', 'identity': {'cognitoIdentityPoolId': None, 'cognitoIdentityId': None, 'apiKey': 'test-invoke-api-key', 'principalOrgId': None, 'cognitoAuthenticationType': None, 'userArn': 'arn:aws:sts::884500545225:assumed-role/power_user/wai.phyo@jpl.nasa.gov', 'apiKeyId': 'test-invoke-api-key-id', 'userAgent': 'aws-internal/3 aws-sdk-java/1.12.201 Linux/5.4.181-109.354.amzn2int.x86_64 OpenJDK_64-Bit_Server_VM/25.322-b06 java/1.8.0_322 vendor/Oracle_Corporation cfg/retry-mode/standard', 'accountId': '884500545225', 'caller': 'AROAJZL4DI6MUSHCBBHGM:wai.phyo@jpl.nasa.gov', 'sourceIp': 'test-invoke-source-ip', 'accessKey': 'ASIA434CXH3EV56T6AS5', 'cognitoAuthenticationProvider': None, 'user': 'AROAJZL4DI6MUSHCBBHGM:wai.phyo@jpl.nasa.gov'}, 'domainName': 'testPrefix.testDomainName', 'apiId': 'gwaxi7ijl4'}, 'body': None, 'isBase64Encoded': False}
1921
2022
:param event:
2123
"""
@@ -40,6 +42,72 @@ def __init__(self, event):
4042
self.__cumulus.with_page_number(self.__page_number)
4143
self.__get_time_range()
4244
self.__get_collection_id()
45+
self.__get_filter()
46+
47+
def __get_filter(self):
48+
"""
49+
https://portal.ogc.org/files/96288#rc_filter
50+
51+
{ "eq": [ { "property": "city" }, "Toronto" ] }
52+
53+
{
54+
"like": [
55+
{ "property": "name" },
56+
"Smith."
57+
],
58+
"singleChar": ".",
59+
"nocase": true
60+
}
61+
62+
{
63+
"in": {
64+
"value": { "property": "cityName" },
65+
"list": [ "Toronto", "Franfurt", "Tokyo", "New York" ],
66+
"nocase": false
67+
}
68+
}
69+
:return:
70+
"""
71+
if 'filter' not in self.__event:
72+
return self
73+
filter_event = json.loads(self.__event['filter'])
74+
if 'in' not in filter_event:
75+
return self
76+
schema = {
77+
"type": {
78+
"required": ["in"],
79+
"properties": {
80+
"in": {
81+
"type": "object",
82+
"required": ["value", "list"],
83+
"properties": {
84+
"value": {
85+
"type": "object",
86+
"required": ["property"],
87+
"properties": {
88+
"property": {"type": "string"}
89+
}
90+
},
91+
"list": {
92+
"type": "array",
93+
"minItems": 1,
94+
"items": {
95+
"type": "string"
96+
}
97+
}
98+
}
99+
}
100+
}
101+
}
102+
}
103+
filter_event_validator_result = JsonValidator(schema).validate(filter_event)
104+
if filter_event_validator_result is not None:
105+
LOGGER.error(f'invalid event: {filter_event_validator_result}. event: {filter_event}')
106+
return self
107+
search_key = filter_event['in']['value']['property']
108+
search_values = filter_event['in']['value']['list']
109+
self.__cumulus.with_filter(search_key, search_values)
110+
return self
43111

44112
def __get_collection_id(self):
45113
if 'pathParameters' not in self.__event:

cumulus_lambda_functions/cumulus_stac/item_transformer.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,15 @@ def __init__(self):
298298
super().__init__()
299299
self.__stac_item_schema = json.loads(STAC_ITEM_SCHEMA)
300300
self.__cumulus_granule_schema = {}
301+
self.CUMULUS_2_STAC_KEYS_MAP = {
302+
'beginningDateTime': 'start_datetime',
303+
'endingDateTime': 'end_datetime',
304+
'productionDateTime': 'created',
305+
'updatedAt': 'updated',
306+
'granuleId': 'id',
307+
'collectionId': 'collection',
308+
}
309+
self.STAC_2_CUMULUS_KEYS_MAP = {v: k for k, v in self.CUMULUS_2_STAC_KEYS_MAP.items()}
301310

302311
def __get_asset_name(self, input_dict):
303312
if input_dict['type'] == 'data':
@@ -380,6 +389,7 @@ def to_stac(self, source: dict) -> dict:
380389
:param source:
381390
:return:
382391
"""
392+
383393
validation_result = JsonValidator(self.__cumulus_granule_schema).validate(source)
384394
if validation_result is not None:
385395
raise ValueError(f'invalid cumulus granule json: {validation_result}')

cumulus_lambda_functions/cumulus_wrapper/query_granules.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,22 @@ class GranulesQuery(CumulusBase):
1515
__ending_time_key = 'endingDateTime'
1616
__beginning_time_key = 'beginningDateTime'
1717
__collection_id_key = 'collectionId'
18+
__granules_id = 'granuleId'
1819

1920
def __init__(self, cumulus_base: str, cumulus_token: str):
2021
super().__init__(cumulus_base, cumulus_token)
2122
self._conditions.append('status=completed')
22-
23+
self._item_transformer = ItemTransformer()
24+
25+
def with_filter(self, filter_key, filter_values: list):
26+
if len(filter_values) < 1:
27+
return self
28+
if filter_key not in self._item_transformer.STAC_2_CUMULUS_KEYS_MAP:
29+
LOGGER.error(f'unknown key in STAC_2_CUMULUS_KEYS_MAP: {filter_key} ')
30+
return self
31+
filter_key = self._item_transformer.STAC_2_CUMULUS_KEYS_MAP[filter_key]
32+
self._conditions.append(f'{filter_key}__in={",".join(filter_values)}')
33+
return self
2334

2435
def with_collection_id(self, collection_id: str):
2536
self._conditions.append(f'{self.__collection_id_key}={collection_id}')
@@ -88,7 +99,7 @@ def query_direct_to_private_api(self, private_api_prefix: str):
8899
'httpMethod': 'GET',
89100
'resource': '/{proxy+}',
90101
'path': f'/{self.__granules_key}',
91-
'queryStringParameters': {k[0]: k[1] for k in [k1.split('=') for k1 in self._conditions]},
102+
'queryStringParameters': {**{k[0]: k[1] for k in [k1.split('=') for k1 in self._conditions]}},
92103
# 'queryStringParameters': {'limit': '30'},
93104
'headers': {
94105
'Content-Type': 'application/json',

cumulus_lambda_functions/stage_in_out/catalog_granules_unity.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,28 @@
11
import json
22

33
from cumulus_lambda_functions.cumulus_dapa_client.dapa_client import DapaClient
4+
from cumulus_lambda_functions.lib.time_utils import TimeUtils
45
from cumulus_lambda_functions.stage_in_out.catalog_granules_abstract import CatalogGranulesAbstract
56
import logging
67
import os
78

9+
from cumulus_lambda_functions.stage_in_out.cataloging_granules_status_checker import CatalogingGranulesStatusChecker
810

911
LOGGER = logging.getLogger(__name__)
1012

1113

1214
class CatalogGranulesUnity(CatalogGranulesAbstract):
1315
PROVIDER_ID_KEY = 'PROVIDER_ID'
1416
VERIFY_SSL_KEY = 'VERIFY_SSL'
17+
DELAY_SECOND = 'DELAY_SECOND'
18+
REPEAT_TIMES = 'REPEAT_TIMES'
1519

1620
def __init__(self) -> None:
1721
super().__init__()
1822
self.__provider_id = ''
1923
self.__verify_ssl = True
24+
self.__delaying_second = 30
25+
self.__repeating_times = 5
2026

2127
def __set_props_from_env(self):
2228
missing_keys = [k for k in [self.UPLOADED_FILES_JSON, self.PROVIDER_ID_KEY] if k not in os.environ]
@@ -25,6 +31,9 @@ def __set_props_from_env(self):
2531
self._retrieve_stac_json()
2632
self.__provider_id = os.environ.get(self.PROVIDER_ID_KEY)
2733
self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
34+
self.__delaying_second = int(os.environ.get(self.DELAY_SECOND, '30'))
35+
self.__repeating_times = int(os.environ.get(self.REPEAT_TIMES, '30'))
36+
self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
2837
return self
2938

3039
def catalog(self, **kwargs):
@@ -38,4 +47,17 @@ def catalog(self, **kwargs):
3847
dapa_client = DapaClient().with_verify_ssl(self.__verify_ssl)
3948
LOGGER.debug(f'dapa_body_granules: {dapa_body}')
4049
dapa_ingest_result = dapa_client.ingest_granules_w_cnm(dapa_body)
41-
return json.dumps(dapa_ingest_result)
50+
extracted_ids = [k['id'] for k in self._uploaded_files_json]
51+
LOGGER.debug(f'checking following IDs: {extracted_ids}')
52+
status_checker = CatalogingGranulesStatusChecker(self._uploaded_files_json[0]['collection'],
53+
extracted_ids,
54+
TimeUtils().get_datetime_obj().timestamp(),
55+
self.__delaying_second,
56+
self.__repeating_times,
57+
self.__verify_ssl)
58+
status_result = status_checker.verify_n_times()
59+
response_json = {
60+
'cataloging_request_status': dapa_ingest_result,
61+
'status_result': status_result
62+
}
63+
return json.dumps(response_json)
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import logging
2+
import time
3+
4+
from cumulus_lambda_functions.cumulus_dapa_client.dapa_client import DapaClient
5+
from cumulus_lambda_functions.cumulus_stac.item_transformer import ItemTransformer
6+
7+
LOGGER = logging.getLogger(__name__)
8+
9+
10+
class CatalogingGranulesStatusChecker:
11+
def __init__(self, collection_id: str, granules_ids: list, threshold_datetime: int, delay: int, repeating_times: int, veriffy_ssl=False):
12+
self.__collection_id = collection_id
13+
self.__granules_ids = granules_ids
14+
self.__threshold_datetime = threshold_datetime
15+
self.__delay = delay
16+
self.__repeating_times = repeating_times if repeating_times > 0 else 1
17+
self.__dapa_client = DapaClient().with_verify_ssl(veriffy_ssl)
18+
self.__registered_granules = {}
19+
20+
def verify_one_time(self):
21+
registered_granules = self.__dapa_client.get_granules(collection_id=self.__collection_id,
22+
filters={
23+
'in': {
24+
'value': {'property': 'id'},
25+
'list': self.__granules_ids
26+
}
27+
})
28+
LOGGER.debug(f'raw registered_granules: {registered_granules}')
29+
registered_granules = [ItemTransformer().from_stac(k) for k in registered_granules['features']]
30+
self.__registered_granules = {k.id: k for k in registered_granules if
31+
k.datetime.timestamp() >= self.__threshold_datetime}
32+
LOGGER.debug(f'registered_granules after filtering: {[k for k in self.__registered_granules.keys()]}')
33+
LOGGER.debug(f'comparison queried v. expected: {len(self.__registered_granules)} v. {len(self.__granules_ids)}')
34+
missing_granules = [k for k in self.__granules_ids if k not in self.__registered_granules]
35+
return {
36+
'cataloged': len(missing_granules) < 1,
37+
'missing_granules': missing_granules,
38+
'registered_granules': [v.to_dict(include_self_link=False, transform_hrefs=False) for v in self.__registered_granules.values()]
39+
}
40+
41+
def verify_n_times(self):
42+
verify_result = {
43+
'missing_granules': [],
44+
'registered_granules': []
45+
}
46+
for i in range(self.__repeating_times):
47+
time.sleep(self.__delay)
48+
verify_result = self.verify_one_time()
49+
LOGGER.debug(f'time {i} verification result: {verify_result}')
50+
if len(verify_result['missing_granules']) < 1:
51+
return verify_result
52+
return verify_result
53+

docker/stage-in-stage-out/dc-004-catalog.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ services:
2727
PROVIDER_ID: 'SNPP'
2828
GRANULES_CATALOG_TYPE: 'UNITY'
2929
UPLOADED_FILES_JSON: '[{"id": "NEW_COLLECTION_EXAMPLE_L1B___9:test_file01", "collection": "NEW_COLLECTION_EXAMPLE_L1B___9", "assets": {"metadata": {"href": "s3://uds-test-cumulus-staging/NEW_COLLECTION_EXAMPLE_L1B___9:test_file01/test_file01.nc.cas"}, "data": {"href": "s3://uds-test-cumulus-staging/NEW_COLLECTION_EXAMPLE_L1B___9:test_file01/test_file01.nc"}}}]'
30+
31+
DELAY_SECOND: '45'
32+
REPEAT_TIMES: '5'
33+
3034
networks:
3135
- internal
3236
networks:

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
setup(
2020
name="cumulus_lambda_functions",
21-
version="5.0.1",
21+
version="5.1.0",
2222
packages=find_packages(),
2323
install_requires=install_requires,
2424
tests_require=['mock', 'nose', 'sphinx', 'sphinx_rtd_theme', 'coverage', 'pystac', 'python-dotenv', 'jsonschema'],

tests/cumulus_lambda_functions/cumulus_wrapper/test_query_granules.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,17 @@ def test_01(self):
1414
self.assertTrue('results' in granules, f'results not in collections: {granules}')
1515
self.assertEqual(7, len(granules['results']), f'wrong length: {granules}')
1616
return
17+
18+
def test_02(self):
19+
lambda_prefix = 'uds-dev-cumulus'
20+
21+
query_granules = GranulesQuery('NA', 'NA')
22+
query_granules.with_collection_id('NEW_COLLECTION_EXAMPLE_L1B___9')
23+
query_granules.with_limit(2)
24+
query_granules.with_filter('id', ['NEW_COLLECTION_EXAMPLE_L1B___9:test_file01', 'NEW_COLLECTION_EXAMPLE_L1B___9:test_file02'])
25+
granules = query_granules.query_direct_to_private_api(lambda_prefix)
26+
self.assertTrue('results' in granules, f'results not in collections: {granules}')
27+
self.assertEqual(2, len(granules['results']), f'wrong length: {granules}')
28+
self.assertEqual(granules['results'][0]['id'], 'NEW_COLLECTION_EXAMPLE_L1B___9:test_file01')
29+
self.assertEqual(granules['results'][1]['id'], 'NEW_COLLECTION_EXAMPLE_L1B___9:test_file02')
30+
return

0 commit comments

Comments
 (0)