Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

breaking: search to return feature-collection. download to read feature-collection + return localized feature-collection w/ relative paths #161

Merged
merged 8 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [4.0.0] - 2023-06-13
### Changed
- [#161](https://github.com/unity-sds/unity-data-services/pull/161) breaking: search to return feature-collection. download to read feature-collection + return localized feature-collection w/ relative paths

## [3.8.2] - 2023-05-23
### Added
- [#154](https://github.com/unity-sds/unity-data-services/pull/154) fix: production datetime not in +00:00 format
Expand Down
12 changes: 9 additions & 3 deletions cumulus_lambda_functions/cumulus_dapa_client/dapa_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,23 @@ def get_all_granules(self, collection_id='*', limit=-1, date_from='', date_to=''
results = []
page_size = 100 if limit < 0 or limit > 100 else limit
offset = 0
items_collection_shell = None
while True:
if 0 < limit <= len(results):
break
temp_results = self.get_granules(collection_id, page_size, offset, date_from, date_to)
if items_collection_shell is None:
items_collection_shell: dict = temp_results
temp_results = temp_results.pop('features')
offset += len(temp_results)
results.extend(temp_results)
if len(temp_results) < page_size:
break
if limit < 0 or limit >= len(results):
return results
return results[0: limit]
items_collection_shell['features'] = results
return items_collection_shell
items_collection_shell['features'] = results[0: limit]
return items_collection_shell

def get_granules(self, collection_id='*', limit=1000, offset=0, date_from='', date_to=''):
"""
Expand All @@ -100,7 +106,7 @@ def get_granules(self, collection_id='*', limit=1000, offset=0, date_from='', da
granules_result = json.loads(response.text)
if 'features' not in granules_result:
raise RuntimeError(f'missing features in response. invalid response: response: {granules_result}')
return granules_result['features']
return granules_result

def ingest_granules_w_cnm(self, cnm_ingest_body: dict) -> str:
dapa_ingest_cnm_api = f'{self.__dapa_base_api}/am-uds-dapa/collections/'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import os
from abc import ABC, abstractmethod

from pystac import ItemCollection, Asset, Item

from cumulus_lambda_functions.lib.utils.file_utils import FileUtils

LOGGER = logging.getLogger(__name__)
Expand All @@ -11,11 +13,34 @@
class DownloadGranulesAbstract(ABC):
STAC_JSON = 'STAC_JSON'
DOWNLOAD_DIR_KEY = 'DOWNLOAD_DIR'
DOWNLOADING_KEYS = 'DOWNLOADING_KEYS'

def __init__(self) -> None:
super().__init__()
self._granules_json = []
self._granules_json: ItemCollection = {}
self._download_dir = '/tmp'
self._downloading_keys = set([k.strip() for k in os.environ.get(self.DOWNLOADING_KEYS, 'data').strip().split(',')])

def _set_props_from_env(self):
raise NotImplementedError(f'to be implemented in concrete classes')

def _download_one_item(self, downloading_url):
raise NotImplementedError(f'to be implemented in concrete classes')

def _download_one_granule_item(self, granule_item: Item):
new_asset_dict = {}
for name, value_dict in granule_item.assets.items():
if name not in self._downloading_keys:
LOGGER.debug(f'skipping {name}. Not in downloading keys')
continue
value_dict: Asset = value_dict
downloading_url = value_dict.href
LOGGER.debug(f'downloading: {downloading_url}')
self._download_one_item(downloading_url)
value_dict.href = os.path.join('.', os.path.basename(downloading_url))
new_asset_dict[name] = value_dict
granule_item.assets = new_asset_dict
return granule_item

def _setup_download_dir(self):
self._download_dir = os.environ.get(self.DOWNLOAD_DIR_KEY)
Expand All @@ -27,17 +52,40 @@ def _setup_download_dir(self):
def _retrieve_stac_json(self):
raw_stac_json = os.environ.get(self.STAC_JSON)
try:
self._granules_json = json.loads(raw_stac_json)
self._granules_json = ItemCollection.from_dict(json.loads(raw_stac_json))
return self
except:
LOGGER.debug(f'raw_stac_json is not STAC_JSON: {raw_stac_json}. trying to see if file exists')
if not FileUtils.file_exist(raw_stac_json):
raise ValueError(f'missing file or not JSON: {raw_stac_json}')
self._granules_json = FileUtils.read_json(raw_stac_json)
if self._granules_json is None:
json_stac = FileUtils.read_json(raw_stac_json)
if json_stac is None:
raise ValueError(f'{raw_stac_json} is not JSON')
self._granules_json = ItemCollection.from_dict(json_stac)
return self

@abstractmethod
def download(self, **kwargs) -> list:
raise NotImplementedError()
def download(self, **kwargs) -> dict:
self._set_props_from_env()
LOGGER.debug(f'creating download dir: {self._download_dir}')
if len(self._granules_json.items) < 1:
LOGGER.warning(f'cannot find any granules')
return self._granules_json.to_dict(False)
local_items = []
error_list = []
for each_item in self._granules_json.items:
try:
local_item = self._download_one_granule_item(each_item)
local_items.append(local_item)
except Exception as e:
LOGGER.exception(f'error downloading granule: {each_item.id}')
error_list.append({'error': str(e), 'id': each_item.id, })
LOGGER.debug(f'finished downloading all granules')
self._granules_json.items = local_items
LOGGER.debug(f'writing features collection json to downloading directory')
granules_json_dict = self._granules_json.to_dict(False)
FileUtils.write_json(os.path.join(self._download_dir, 'downloaded_feature_collection.json'), granules_json_dict, overwrite=True, prettify=True)
LOGGER.debug(f'writing errors if any')
if len(error_list) > 0:
with open(f'{self._download_dir}/error.log', 'w') as error_file:
error_file.write(json.dumps(error_list, indent=4))
return granules_json_dict
79 changes: 8 additions & 71 deletions cumulus_lambda_functions/stage_in_out/download_granules_daac.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self) -> None:
super().__init__()
self.__edl_token = None

def __set_props_from_env(self):
def _set_props_from_env(self):
missing_keys = [k for k in [self.STAC_JSON, self.DOWNLOAD_DIR_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')
Expand All @@ -25,76 +25,13 @@ def __set_props_from_env(self):
self.__edl_token = URSTokenRetriever().start()
return self

def __get_downloading_urls(self, granules_result: list):
if len(granules_result) < 1:
LOGGER.warning(f'cannot find any granules')
return []
downloading_urls = [k['assets'] for k in granules_result]
return downloading_urls

def __download_one_granule(self, assets: dict):
"""
sample assets
{
"data": {
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853900.PDS",
"title": "P1570515ATMSSCIENCEAAT16017044853900.PDS",
"description": "P1570515ATMSSCIENCEAAT16017044853900.PDS"
},
"metadata__data": {
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853901.PDS",
"title": "P1570515ATMSSCIENCEAAT16017044853901.PDS",
"description": "P1570515ATMSSCIENCEAAT16017044853901.PDS"
},
"metadata__xml": {
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853901.PDS.xml",
"title": "P1570515ATMSSCIENCEAAT16017044853901.PDS.xml",
"description": "P1570515ATMSSCIENCEAAT16017044853901.PDS.xml"
},
"metadata__cmr": {
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853900.PDS.cmr.xml",
"title": "P1570515ATMSSCIENCEAAT16017044853900.PDS.cmr.xml",
"description": "P1570515ATMSSCIENCEAAT16017044853900.PDS.cmr.xml"
}
}
:param assets:
:return:
"""
error_log = []
def _download_one_item(self, downloading_url):
headers = {
'Authorization': f'Bearer {self.__edl_token}'
}
local_item = {}
for k, v in assets.items():
local_item[k] = v
try:
LOGGER.debug(f'downloading: {v["href"]}')
r = requests.get(v['href'], headers=headers)
if r.status_code >= 400:
raise RuntimeError(f'wrong response status: {r.status_code}. details: {r.content}')
# TODO. how to correctly check redirecting to login page
local_file_path = os.path.join(self._download_dir, os.path.basename(v["href"]))
with open(local_file_path, 'wb') as fd:
fd.write(r.content)
local_item[k]['href'] = local_file_path
except Exception as e:
LOGGER.exception(f'failed to download {v}')
local_item[k]['description'] = f'download failed. {str(e)}'
error_log.append(v)
return local_item, error_log

def download(self, **kwargs) -> list:
self.__set_props_from_env()
LOGGER.debug(f'creating download dir: {self._download_dir}')
downloading_urls = self.__get_downloading_urls(self._granules_json)
error_list = []
local_items = []
for each in downloading_urls:
LOGGER.debug(f'working on {each}')
local_item, current_error_list = self.__download_one_granule(each)
error_list.extend(current_error_list)
local_items.append({'assets': local_item})
if len(error_list) > 0:
with open(f'{self._download_dir}/error.log', 'w') as error_file:
error_file.write(json.dumps(error_list, indent=4))
return local_items
r = requests.get(downloading_url, headers=headers)
r.raise_for_status()
local_file_path = os.path.join(self._download_dir, os.path.basename(downloading_url))
with open(local_file_path, 'wb') as fd:
fd.write(r.content)
return local_file_path
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

class DownloadGranulesFactory:
S3 = 'S3'
HTTP = 'HTTP'
DAAC = 'DAAC'

def get_class(self, search_type):
Expand All @@ -11,4 +12,7 @@ def get_class(self, search_type):
elif search_type == DownloadGranulesFactory.DAAC:
from cumulus_lambda_functions.stage_in_out.download_granules_daac import DownloadGranulesDAAC
return DownloadGranulesDAAC()
elif search_type == DownloadGranulesFactory.HTTP:
from cumulus_lambda_functions.stage_in_out.download_granules_http import DownloadGranulesHttp
return DownloadGranulesHttp()
raise ValueError(f'unknown search_type: {search_type}')
33 changes: 33 additions & 0 deletions cumulus_lambda_functions/stage_in_out/download_granules_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import shutil

import requests

from cumulus_lambda_functions.stage_in_out.download_granules_abstract import DownloadGranulesAbstract
import json
import logging
import os

LOGGER = logging.getLogger(__name__)


class DownloadGranulesHttp(DownloadGranulesAbstract):

def __init__(self) -> None:
super().__init__()

def _set_props_from_env(self):
missing_keys = [k for k in [self.STAC_JSON, self.DOWNLOAD_DIR_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')
self._retrieve_stac_json()
self._setup_download_dir()
return self

def _download_one_item(self, downloading_url):
downloading_response = requests.get(downloading_url)
downloading_response.raise_for_status()
downloading_response.raw.decode_content = True
local_file_path = os.path.join(self._download_dir, os.path.basename(downloading_url))
with open(local_file_path, 'wb') as f:
shutil.copyfileobj(downloading_response.raw, f)
return local_file_path
69 changes: 4 additions & 65 deletions cumulus_lambda_functions/stage_in_out/download_granules_s3.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from cumulus_lambda_functions.stage_in_out.download_granules_abstract import DownloadGranulesAbstract
import json
import logging
import os

Expand All @@ -14,74 +13,14 @@ def __init__(self) -> None:
super().__init__()
self.__s3 = AwsS3()

def __set_props_from_env(self):
def _set_props_from_env(self):
missing_keys = [k for k in [self.STAC_JSON, self.DOWNLOAD_DIR_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')
self._retrieve_stac_json()
self._setup_download_dir()
return self

def __get_downloading_urls(self, granules_result: list):
if len(granules_result) < 1:
LOGGER.warning(f'cannot find any granules')
return []
downloading_urls = [k['assets'] for k in granules_result]
return downloading_urls

def __download_one_granule(self, assets: dict):
"""
sample assets
{
"data": {
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853900.PDS",
"title": "P1570515ATMSSCIENCEAAT16017044853900.PDS",
"description": "P1570515ATMSSCIENCEAAT16017044853900.PDS"
},
"metadata__data": {
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853901.PDS",
"title": "P1570515ATMSSCIENCEAAT16017044853901.PDS",
"description": "P1570515ATMSSCIENCEAAT16017044853901.PDS"
},
"metadata__xml": {
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853901.PDS.xml",
"title": "P1570515ATMSSCIENCEAAT16017044853901.PDS.xml",
"description": "P1570515ATMSSCIENCEAAT16017044853901.PDS.xml"
},
"metadata__cmr": {
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853900.PDS.cmr.xml",
"title": "P1570515ATMSSCIENCEAAT16017044853900.PDS.cmr.xml",
"description": "P1570515ATMSSCIENCEAAT16017044853900.PDS.cmr.xml"
}
}
:param assets:
:return:
"""
error_log = []
local_item = {}
for k, v in assets.items():
local_item[k] = v
try:
LOGGER.debug(f'downloading: {v["href"]}')
local_file_path = self.__s3.set_s3_url(v['href']).download(self._download_dir)
local_item[k]['href'] = local_file_path
except Exception as e:
LOGGER.exception(f'failed to download {v}')
local_item[k]['description'] = f'download failed. {str(e)}'
error_log.append(v)
return local_item, error_log

def download(self, **kwargs) -> list:
self.__set_props_from_env()
downloading_urls = self.__get_downloading_urls(self._granules_json)
error_list = []
local_items = []
for each in downloading_urls:
LOGGER.debug(f'working on {each}')
local_item, current_error_list = self.__download_one_granule(each)
local_items.append({'assets': local_item})
error_list.extend(current_error_list)
if len(error_list) > 0:
with open(f'{self._download_dir}/error.log', 'w') as error_file:
error_file.write(json.dumps(error_list, indent=4))
return local_items
def _download_one_item(self, downloading_url):
local_file_path = self.__s3.set_s3_url(downloading_url).download(self._download_dir)
return local_file_path
Loading