Skip to content

feat: added filter keyword in granules endpoint + repeatedly checking with time boundary for cataloging result #156

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 5, 2023
Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [5.1.0] - 2023-06-08
### Added
- [#156](https://github.com/unity-sds/unity-data-services/pull/156) feat: added filter keyword in granules endpoint + repeatedly checking with time boundary for cataloging result

## [5.0.1] - 2023-06-21
### Added
- [#165](https://github.com/unity-sds/unity-data-services/pull/165) fix: convert all outputs into json str
Expand Down
4 changes: 3 additions & 1 deletion cumulus_lambda_functions/cumulus_dapa_client/dapa_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def get_all_granules(self, collection_id='*', limit=-1, date_from='', date_to=''
items_collection_shell['features'] = results[0: limit]
return items_collection_shell

def get_granules(self, collection_id='*', limit=1000, offset=0, date_from='', date_to=''):
def get_granules(self, collection_id='*', limit=1000, offset=0, date_from='', date_to='', filters=None):
"""
TODO: pagination. getting only 1st 1k item
:param collection_id:
Expand All @@ -95,6 +95,8 @@ def get_granules(self, collection_id='*', limit=1000, offset=0, date_from='', da
dapa_granules_api = f'{self.__dapa_base_api}/am-uds-dapa/collections/{collection_id}/items?limit={limit}&offset={offset}'
if date_from != '' or date_to != '':
dapa_granules_api = f"{dapa_granules_api}&datetime={date_from if date_from != '' else '..'}/{date_to if date_to != '' else '..'}"
if filter is not None:
dapa_granules_api = f'{dapa_granules_api}&filter={json.dumps(filters)}'
LOGGER.debug(f'dapa_granules_api: {dapa_granules_api}')
LOGGER.debug(f'getting granules for: {dapa_granules_api}')
self.__get_token()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os

from cumulus_lambda_functions.cumulus_wrapper.query_granules import GranulesQuery
from cumulus_lambda_functions.lib.json_validator import JsonValidator
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator
from cumulus_lambda_functions.lib.utils.lambda_api_gateway_utils import LambdaApiGatewayUtils

Expand All @@ -15,7 +16,8 @@ def __init__(self, event):
'path': '/collections/observation/items',
'httpMethod': 'GET',
'headers': {'Authorization': ' Bearer asdfafweaw'}, 'multiValueHeaders': {'Authorization': [' Bearer asdfafweaw']},
'queryStringParameters': {'datetime': 'asfa;lsfdjafal', 'bbox': '12,12,12,3', 'limit': '12'}, 'multiValueQueryStringParameters': {'datetime': ['asfa;lsfdjafal'], 'bbox': ['12,12,12,3'], 'limit': ['12']}, 'pathParameters': None, 'stageVariables': None, 'requestContext': {'resourceId': 'hgdxj6', 'resourcePath': '/collections/observation/items', 'httpMethod': 'GET', 'extendedRequestId': 'SSEa6F80PHcF3xg=', 'requestTime': '17/May/2022:18:20:40 +0000', 'path': '/collections/observation/items', 'accountId': '884500545225', 'protocol': 'HTTP/1.1', 'stage': 'test-invoke-stage', 'domainPrefix': 'testPrefix', 'requestTimeEpoch': 1652811640832, 'requestId': '703f404d-cb95-43d3-8f48-523e5b1860e4', 'identity': {'cognitoIdentityPoolId': None, 'cognitoIdentityId': None, 'apiKey': 'test-invoke-api-key', 'principalOrgId': None, 'cognitoAuthenticationType': None, 'userArn': 'arn:aws:sts::884500545225:assumed-role/power_user/wai.phyo@jpl.nasa.gov', 'apiKeyId': 'test-invoke-api-key-id', 'userAgent': 'aws-internal/3 aws-sdk-java/1.12.201 Linux/5.4.181-109.354.amzn2int.x86_64 OpenJDK_64-Bit_Server_VM/25.322-b06 java/1.8.0_322 vendor/Oracle_Corporation cfg/retry-mode/standard', 'accountId': '884500545225', 'caller': 'AROAJZL4DI6MUSHCBBHGM:wai.phyo@jpl.nasa.gov', 'sourceIp': 'test-invoke-source-ip', 'accessKey': 'ASIA434CXH3EV56T6AS5', 'cognitoAuthenticationProvider': None, 'user': 'AROAJZL4DI6MUSHCBBHGM:wai.phyo@jpl.nasa.gov'}, 'domainName': 'testPrefix.testDomainName', 'apiId': 'gwaxi7ijl4'}, 'body': None, 'isBase64Encoded': False}
'queryStringParameters': {'datetime': 'asfa;lsfdjafal', 'bbox': '12,12,12,3', 'limit': '12'},
'multiValueQueryStringParameters': {'datetime': ['asfa;lsfdjafal'], 'bbox': ['12,12,12,3'], 'limit': ['12']}, 'pathParameters': None, 'stageVariables': None, 'requestContext': {'resourceId': 'hgdxj6', 'resourcePath': '/collections/observation/items', 'httpMethod': 'GET', 'extendedRequestId': 'SSEa6F80PHcF3xg=', 'requestTime': '17/May/2022:18:20:40 +0000', 'path': '/collections/observation/items', 'accountId': '884500545225', 'protocol': 'HTTP/1.1', 'stage': 'test-invoke-stage', 'domainPrefix': 'testPrefix', 'requestTimeEpoch': 1652811640832, 'requestId': '703f404d-cb95-43d3-8f48-523e5b1860e4', 'identity': {'cognitoIdentityPoolId': None, 'cognitoIdentityId': None, 'apiKey': 'test-invoke-api-key', 'principalOrgId': None, 'cognitoAuthenticationType': None, 'userArn': 'arn:aws:sts::884500545225:assumed-role/power_user/wai.phyo@jpl.nasa.gov', 'apiKeyId': 'test-invoke-api-key-id', 'userAgent': 'aws-internal/3 aws-sdk-java/1.12.201 Linux/5.4.181-109.354.amzn2int.x86_64 OpenJDK_64-Bit_Server_VM/25.322-b06 java/1.8.0_322 vendor/Oracle_Corporation cfg/retry-mode/standard', 'accountId': '884500545225', 'caller': 'AROAJZL4DI6MUSHCBBHGM:wai.phyo@jpl.nasa.gov', 'sourceIp': 'test-invoke-source-ip', 'accessKey': 'ASIA434CXH3EV56T6AS5', 'cognitoAuthenticationProvider': None, 'user': 'AROAJZL4DI6MUSHCBBHGM:wai.phyo@jpl.nasa.gov'}, 'domainName': 'testPrefix.testDomainName', 'apiId': 'gwaxi7ijl4'}, 'body': None, 'isBase64Encoded': False}

:param event:
"""
Expand All @@ -40,6 +42,72 @@ def __init__(self, event):
self.__cumulus.with_page_number(self.__page_number)
self.__get_time_range()
self.__get_collection_id()
self.__get_filter()

def __get_filter(self):
"""
https://portal.ogc.org/files/96288#rc_filter

{ "eq": [ { "property": "city" }, "Toronto" ] }

{
"like": [
{ "property": "name" },
"Smith."
],
"singleChar": ".",
"nocase": true
}

{
"in": {
"value": { "property": "cityName" },
"list": [ "Toronto", "Franfurt", "Tokyo", "New York" ],
"nocase": false
}
}
:return:
"""
if 'filter' not in self.__event:
return self
filter_event = json.loads(self.__event['filter'])
if 'in' not in filter_event:
return self
schema = {
"type": {
"required": ["in"],
"properties": {
"in": {
"type": "object",
"required": ["value", "list"],
"properties": {
"value": {
"type": "object",
"required": ["property"],
"properties": {
"property": {"type": "string"}
}
},
"list": {
"type": "array",
"minItems": 1,
"items": {
"type": "string"
}
}
}
}
}
}
}
filter_event_validator_result = JsonValidator(schema).validate(filter_event)
if filter_event_validator_result is not None:
LOGGER.error(f'invalid event: {filter_event_validator_result}. event: {filter_event}')
return self
search_key = filter_event['in']['value']['property']
search_values = filter_event['in']['value']['list']
self.__cumulus.with_filter(search_key, search_values)
return self

def __get_collection_id(self):
if 'pathParameters' not in self.__event:
Expand Down
10 changes: 10 additions & 0 deletions cumulus_lambda_functions/cumulus_stac/item_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,15 @@ def __init__(self):
super().__init__()
self.__stac_item_schema = json.loads(STAC_ITEM_SCHEMA)
self.__cumulus_granule_schema = {}
self.CUMULUS_2_STAC_KEYS_MAP = {
'beginningDateTime': 'start_datetime',
'endingDateTime': 'end_datetime',
'productionDateTime': 'created',
'updatedAt': 'updated',
'granuleId': 'id',
'collectionId': 'collection',
}
self.STAC_2_CUMULUS_KEYS_MAP = {v: k for k, v in self.CUMULUS_2_STAC_KEYS_MAP.items()}

def __get_asset_name(self, input_dict):
if input_dict['type'] == 'data':
Expand Down Expand Up @@ -380,6 +389,7 @@ def to_stac(self, source: dict) -> dict:
:param source:
:return:
"""

validation_result = JsonValidator(self.__cumulus_granule_schema).validate(source)
if validation_result is not None:
raise ValueError(f'invalid cumulus granule json: {validation_result}')
Expand Down
15 changes: 13 additions & 2 deletions cumulus_lambda_functions/cumulus_wrapper/query_granules.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,22 @@ class GranulesQuery(CumulusBase):
__ending_time_key = 'endingDateTime'
__beginning_time_key = 'beginningDateTime'
__collection_id_key = 'collectionId'
__granules_id = 'granuleId'

def __init__(self, cumulus_base: str, cumulus_token: str):
super().__init__(cumulus_base, cumulus_token)
self._conditions.append('status=completed')

self._item_transformer = ItemTransformer()

def with_filter(self, filter_key, filter_values: list):
if len(filter_values) < 1:
return self
if filter_key not in self._item_transformer.STAC_2_CUMULUS_KEYS_MAP:
LOGGER.error(f'unknown key in STAC_2_CUMULUS_KEYS_MAP: {filter_key} ')
return self
filter_key = self._item_transformer.STAC_2_CUMULUS_KEYS_MAP[filter_key]
self._conditions.append(f'{filter_key}__in={",".join(filter_values)}')
return self

def with_collection_id(self, collection_id: str):
self._conditions.append(f'{self.__collection_id_key}={collection_id}')
Expand Down Expand Up @@ -88,7 +99,7 @@ def query_direct_to_private_api(self, private_api_prefix: str):
'httpMethod': 'GET',
'resource': '/{proxy+}',
'path': f'/{self.__granules_key}',
'queryStringParameters': {k[0]: k[1] for k in [k1.split('=') for k1 in self._conditions]},
'queryStringParameters': {**{k[0]: k[1] for k in [k1.split('=') for k1 in self._conditions]}},
# 'queryStringParameters': {'limit': '30'},
'headers': {
'Content-Type': 'application/json',
Expand Down
24 changes: 23 additions & 1 deletion cumulus_lambda_functions/stage_in_out/catalog_granules_unity.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
import json

from cumulus_lambda_functions.cumulus_dapa_client.dapa_client import DapaClient
from cumulus_lambda_functions.lib.time_utils import TimeUtils
from cumulus_lambda_functions.stage_in_out.catalog_granules_abstract import CatalogGranulesAbstract
import logging
import os

from cumulus_lambda_functions.stage_in_out.cataloging_granules_status_checker import CatalogingGranulesStatusChecker

LOGGER = logging.getLogger(__name__)


class CatalogGranulesUnity(CatalogGranulesAbstract):
PROVIDER_ID_KEY = 'PROVIDER_ID'
VERIFY_SSL_KEY = 'VERIFY_SSL'
DELAY_SECOND = 'DELAY_SECOND'
REPEAT_TIMES = 'REPEAT_TIMES'

def __init__(self) -> None:
super().__init__()
self.__provider_id = ''
self.__verify_ssl = True
self.__delaying_second = 30
self.__repeating_times = 5

def __set_props_from_env(self):
missing_keys = [k for k in [self.UPLOADED_FILES_JSON, self.PROVIDER_ID_KEY] if k not in os.environ]
Expand All @@ -25,6 +31,9 @@ def __set_props_from_env(self):
self._retrieve_stac_json()
self.__provider_id = os.environ.get(self.PROVIDER_ID_KEY)
self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
self.__delaying_second = int(os.environ.get(self.DELAY_SECOND, '30'))
self.__repeating_times = int(os.environ.get(self.REPEAT_TIMES, '30'))
self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
return self

def catalog(self, **kwargs):
Expand All @@ -38,4 +47,17 @@ def catalog(self, **kwargs):
dapa_client = DapaClient().with_verify_ssl(self.__verify_ssl)
LOGGER.debug(f'dapa_body_granules: {dapa_body}')
dapa_ingest_result = dapa_client.ingest_granules_w_cnm(dapa_body)
return json.dumps(dapa_ingest_result)
extracted_ids = [k['id'] for k in self._uploaded_files_json]
LOGGER.debug(f'checking following IDs: {extracted_ids}')
status_checker = CatalogingGranulesStatusChecker(self._uploaded_files_json[0]['collection'],
extracted_ids,
TimeUtils().get_datetime_obj().timestamp(),
self.__delaying_second,
self.__repeating_times,
self.__verify_ssl)
status_result = status_checker.verify_n_times()
response_json = {
'cataloging_request_status': dapa_ingest_result,
'status_result': status_result
}
return json.dumps(response_json)
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import logging
import time

from cumulus_lambda_functions.cumulus_dapa_client.dapa_client import DapaClient
from cumulus_lambda_functions.cumulus_stac.item_transformer import ItemTransformer

LOGGER = logging.getLogger(__name__)


class CatalogingGranulesStatusChecker:
def __init__(self, collection_id: str, granules_ids: list, threshold_datetime: int, delay: int, repeating_times: int, veriffy_ssl=False):
self.__collection_id = collection_id
self.__granules_ids = granules_ids
self.__threshold_datetime = threshold_datetime
self.__delay = delay
self.__repeating_times = repeating_times if repeating_times > 0 else 1
self.__dapa_client = DapaClient().with_verify_ssl(veriffy_ssl)
self.__registered_granules = {}

def verify_one_time(self):
registered_granules = self.__dapa_client.get_granules(collection_id=self.__collection_id,
filters={
'in': {
'value': {'property': 'id'},
'list': self.__granules_ids
}
})
LOGGER.debug(f'raw registered_granules: {registered_granules}')
registered_granules = [ItemTransformer().from_stac(k) for k in registered_granules['features']]
self.__registered_granules = {k.id: k for k in registered_granules if
k.datetime.timestamp() >= self.__threshold_datetime}
LOGGER.debug(f'registered_granules after filtering: {[k for k in self.__registered_granules.keys()]}')
LOGGER.debug(f'comparison queried v. expected: {len(self.__registered_granules)} v. {len(self.__granules_ids)}')
missing_granules = [k for k in self.__granules_ids if k not in self.__registered_granules]
return {
'cataloged': len(missing_granules) < 1,
'missing_granules': missing_granules,
'registered_granules': [v.to_dict(include_self_link=False, transform_hrefs=False) for v in self.__registered_granules.values()]
}

def verify_n_times(self):
verify_result = {
'missing_granules': [],
'registered_granules': []
}
for i in range(self.__repeating_times):
time.sleep(self.__delay)
verify_result = self.verify_one_time()
LOGGER.debug(f'time {i} verification result: {verify_result}')
if len(verify_result['missing_granules']) < 1:
return verify_result
return verify_result

4 changes: 4 additions & 0 deletions docker/stage-in-stage-out/dc-004-catalog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ services:
PROVIDER_ID: 'SNPP'
GRANULES_CATALOG_TYPE: 'UNITY'
UPLOADED_FILES_JSON: '[{"id": "NEW_COLLECTION_EXAMPLE_L1B___9:test_file01", "collection": "NEW_COLLECTION_EXAMPLE_L1B___9", "assets": {"metadata": {"href": "s3://uds-test-cumulus-staging/NEW_COLLECTION_EXAMPLE_L1B___9:test_file01/test_file01.nc.cas"}, "data": {"href": "s3://uds-test-cumulus-staging/NEW_COLLECTION_EXAMPLE_L1B___9:test_file01/test_file01.nc"}}}]'

DELAY_SECOND: '45'
REPEAT_TIMES: '5'

networks:
- internal
networks:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

setup(
name="cumulus_lambda_functions",
version="5.0.1",
version="5.1.0",
packages=find_packages(),
install_requires=install_requires,
tests_require=['mock', 'nose', 'sphinx', 'sphinx_rtd_theme', 'coverage', 'pystac', 'python-dotenv', 'jsonschema'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,17 @@ def test_01(self):
self.assertTrue('results' in granules, f'results not in collections: {granules}')
self.assertEqual(7, len(granules['results']), f'wrong length: {granules}')
return

def test_02(self):
lambda_prefix = 'uds-dev-cumulus'

query_granules = GranulesQuery('NA', 'NA')
query_granules.with_collection_id('NEW_COLLECTION_EXAMPLE_L1B___9')
query_granules.with_limit(2)
query_granules.with_filter('id', ['NEW_COLLECTION_EXAMPLE_L1B___9:test_file01', 'NEW_COLLECTION_EXAMPLE_L1B___9:test_file02'])
granules = query_granules.query_direct_to_private_api(lambda_prefix)
self.assertTrue('results' in granules, f'results not in collections: {granules}')
self.assertEqual(2, len(granules['results']), f'wrong length: {granules}')
self.assertEqual(granules['results'][0]['id'], 'NEW_COLLECTION_EXAMPLE_L1B___9:test_file01')
self.assertEqual(granules['results'][1]['id'], 'NEW_COLLECTION_EXAMPLE_L1B___9:test_file02')
return
Loading