Skip to content

Commit 549e8b9

Browse files
committed
breaking: daac downloader accepts only features collection
1 parent d6e9c4b commit 549e8b9

File tree

4 files changed

+522
-75
lines changed

4 files changed

+522
-75
lines changed

cumulus_lambda_functions/stage_in_out/download_granules_abstract.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import os
44
from abc import ABC, abstractmethod
55

6+
from pystac import ItemCollection
7+
68
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils
79

810
LOGGER = logging.getLogger(__name__)
@@ -11,11 +13,13 @@
1113
class DownloadGranulesAbstract(ABC):
1214
STAC_JSON = 'STAC_JSON'
1315
DOWNLOAD_DIR_KEY = 'DOWNLOAD_DIR'
16+
DOWNLOADING_KEYS = 'DOWNLOADING_KEYS'
1417

1518
def __init__(self) -> None:
1619
super().__init__()
17-
self._granules_json = []
20+
self._granules_json: ItemCollection = {}
1821
self._download_dir = '/tmp'
22+
self._downloading_keys = set([k.strip() for k in os.environ.get(self.DOWNLOADING_KEYS, 'data').strip().split(',')])
1923

2024
def _setup_download_dir(self):
2125
self._download_dir = os.environ.get(self.DOWNLOAD_DIR_KEY)
@@ -27,17 +31,18 @@ def _setup_download_dir(self):
2731
def _retrieve_stac_json(self):
2832
raw_stac_json = os.environ.get(self.STAC_JSON)
2933
try:
30-
self._granules_json = json.loads(raw_stac_json)
34+
self._granules_json = ItemCollection.from_dict(json.loads(raw_stac_json))
3135
return self
3236
except:
3337
LOGGER.debug(f'raw_stac_json is not STAC_JSON: {raw_stac_json}. trying to see if file exists')
3438
if not FileUtils.file_exist(raw_stac_json):
3539
raise ValueError(f'missing file or not JSON: {raw_stac_json}')
36-
self._granules_json = FileUtils.read_json(raw_stac_json)
37-
if self._granules_json is None:
40+
json_stac = FileUtils.read_json(raw_stac_json)
41+
if json_stac is None:
3842
raise ValueError(f'{raw_stac_json} is not JSON')
43+
self._granules_json = ItemCollection.from_dict(json_stac)
3944
return self
4045

4146
@abstractmethod
42-
def download(self, **kwargs) -> list:
47+
def download(self, **kwargs) -> dict:
4348
raise NotImplementedError()
Lines changed: 39 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import requests
2+
from pystac import ItemCollection, Item, Asset
23

34
from cumulus_lambda_functions.lib.earthdata_login.urs_token_retriever import URSTokenRetriever
5+
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils
46
from cumulus_lambda_functions.stage_in_out.download_granules_abstract import DownloadGranulesAbstract
57
import json
68
import logging
@@ -25,76 +27,50 @@ def __set_props_from_env(self):
2527
self.__edl_token = URSTokenRetriever().start()
2628
return self
2729

28-
def __get_downloading_urls(self, granules_result: list):
29-
if len(granules_result) < 1:
30-
LOGGER.warning(f'cannot find any granules')
31-
return []
32-
downloading_urls = [k['assets'] for k in granules_result]
33-
return downloading_urls
34-
35-
def __download_one_granule(self, assets: dict):
36-
"""
37-
sample assets
38-
{
39-
"data": {
40-
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853900.PDS",
41-
"title": "P1570515ATMSSCIENCEAAT16017044853900.PDS",
42-
"description": "P1570515ATMSSCIENCEAAT16017044853900.PDS"
43-
},
44-
"metadata__data": {
45-
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853901.PDS",
46-
"title": "P1570515ATMSSCIENCEAAT16017044853901.PDS",
47-
"description": "P1570515ATMSSCIENCEAAT16017044853901.PDS"
48-
},
49-
"metadata__xml": {
50-
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853901.PDS.xml",
51-
"title": "P1570515ATMSSCIENCEAAT16017044853901.PDS.xml",
52-
"description": "P1570515ATMSSCIENCEAAT16017044853901.PDS.xml"
53-
},
54-
"metadata__cmr": {
55-
"href": "s3://am-uds-dev-cumulus-internal/ATMS_SCIENCE_Group___1/P1570515ATMSSCIENCEAAT16017044853900.PDS.cmr.xml",
56-
"title": "P1570515ATMSSCIENCEAAT16017044853900.PDS.cmr.xml",
57-
"description": "P1570515ATMSSCIENCEAAT16017044853900.PDS.cmr.xml"
58-
}
59-
}
60-
:param assets:
61-
:return:
62-
"""
63-
error_log = []
30+
def __download_one_granule_item(self, granule_item: Item):
6431
headers = {
6532
'Authorization': f'Bearer {self.__edl_token}'
6633
}
67-
local_item = {}
68-
for k, v in assets.items():
69-
local_item[k] = v
70-
try:
71-
LOGGER.debug(f'downloading: {v["href"]}')
72-
r = requests.get(v['href'], headers=headers)
73-
if r.status_code >= 400:
74-
raise RuntimeError(f'wrong response status: {r.status_code}. details: {r.content}')
75-
# TODO. how to correctly check redirecting to login page
76-
local_file_path = os.path.join(self._download_dir, os.path.basename(v["href"]))
77-
with open(local_file_path, 'wb') as fd:
78-
fd.write(r.content)
79-
local_item[k]['href'] = local_file_path
80-
except Exception as e:
81-
LOGGER.exception(f'failed to download {v}')
82-
local_item[k]['description'] = f'download failed. {str(e)}'
83-
error_log.append(v)
84-
return local_item, error_log
34+
new_asset_dict = {}
35+
for name, value_dict in granule_item.assets.items():
36+
if name not in self._downloading_keys:
37+
LOGGER.debug(f'skipping {name}. Not in downloading keys')
38+
continue
39+
value_dict: Asset = value_dict
40+
downloading_url = value_dict.href
41+
LOGGER.debug(f'downloading: {downloading_url}')
42+
r = requests.get(downloading_url, headers=headers)
43+
r.raise_for_status()
44+
local_file_path = os.path.join(self._download_dir, os.path.basename(downloading_url))
45+
with open(local_file_path, 'wb') as fd:
46+
fd.write(r.content)
47+
value_dict.href = os.path.join('.', os.path.basename(downloading_url))
48+
new_asset_dict[name] = value_dict
49+
granule_item.assets = new_asset_dict
50+
return granule_item
8551

86-
def download(self, **kwargs) -> list:
52+
def download(self, **kwargs) -> dict:
8753
self.__set_props_from_env()
8854
LOGGER.debug(f'creating download dir: {self._download_dir}')
89-
downloading_urls = self.__get_downloading_urls(self._granules_json)
90-
error_list = []
55+
if len(self._granules_json.items) < 1:
56+
LOGGER.warning(f'cannot find any granules')
57+
return self._granules_json.to_dict(False)
9158
local_items = []
92-
for each in downloading_urls:
93-
LOGGER.debug(f'working on {each}')
94-
local_item, current_error_list = self.__download_one_granule(each)
95-
error_list.extend(current_error_list)
96-
local_items.append({'assets': local_item})
59+
error_list = []
60+
for each_item in self._granules_json.items:
61+
try:
62+
local_item = self.__download_one_granule_item(each_item)
63+
local_items.append(local_item)
64+
except Exception as e:
65+
LOGGER.exception(f'error downloading granule: {each_item.id}')
66+
error_list.append({'error': str(e), 'id': each_item.id, })
67+
LOGGER.debug(f'finished downloading all granules')
68+
self._granules_json.items = local_items
69+
LOGGER.debug(f'writing features collection json to downloading directory')
70+
granules_json_dict = self._granules_json.to_dict(False)
71+
FileUtils.write_json(os.path.join(self._download_dir, 'downloaded_feature_collection.json'), granules_json_dict, overwrite=True, prettify=True)
72+
LOGGER.debug(f'writing errors if any')
9773
if len(error_list) > 0:
9874
with open(f'{self._download_dir}/error.log', 'w') as error_file:
9975
error_file.write(json.dumps(error_list, indent=4))
100-
return local_items
76+
return granules_json_dict

0 commit comments

Comments
 (0)