Skip to content

Commit e03f0e5

Browse files
authored
Merge pull request #143 from unity-sds/develop
release/3.6.0
2 parents 96b84b1 + 08d036b commit e03f0e5

File tree

11 files changed

+619
-50
lines changed

11 files changed

+619
-50
lines changed

CHANGELOG.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,16 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [3.6.0] - 2023-04-24
9+
### Added
10+
- [#142](https://github.com/unity-sds/unity-data-services/pull/142) feat: Support DAAC download files stac file, not just direct json text
11+
12+
## [3.5.0] - 2023-04-18
13+
### Added
14+
- [#138](https://github.com/unity-sds/unity-data-services/pull/138) feat: Checkout stage with STAC catalog json
15+
816
## [3.4.0] - 2023-04-17
9-
### Changed
17+
### Added
1018
- [#132](https://github.com/unity-sds/unity-data-services/pull/132) feat: add DAAC download logic
1119

1220
## [3.3.1] - 2023-04-13
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from pystac import Catalog, Item, Asset
2+
3+
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils
4+
5+
6+
class GranulesCatalog:
7+
8+
def get_child_link_hrefs(self, catalog_file_path: str):
9+
if not FileUtils.file_exist(catalog_file_path):
10+
raise ValueError(f'missing file: {catalog_file_path}')
11+
catalog = FileUtils.read_json(catalog_file_path)
12+
catalog = Catalog.from_dict(catalog)
13+
return [k.href for k in catalog.get_links(rel='child')]
14+
15+
def get_granules_item(self, granule_stac_json) -> Item:
16+
if not FileUtils.file_exist(granule_stac_json):
17+
raise ValueError(f'missing file: {granule_stac_json}')
18+
granules_stac = FileUtils.read_json(granule_stac_json)
19+
granules_stac = Item.from_dict(granules_stac)
20+
return granules_stac
21+
22+
def extract_assets_href(self, granules_stac: Item) -> dict:
23+
assets = {k: v.href for k, v in granules_stac.get_assets().items()}
24+
return assets
25+
26+
def update_assets_href(self, granules_stac: Item, new_assets: dict):
27+
for k, v in new_assets.items():
28+
if k in granules_stac.assets:
29+
existing_asset = granules_stac.assets.get(k)
30+
existing_asset.href = v
31+
else:
32+
existing_asset = Asset(v, k)
33+
granules_stac.add_asset(k, existing_asset)
34+
return self
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,43 @@
1+
import json
2+
import logging
3+
import os
14
from abc import ABC, abstractmethod
25

6+
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils
7+
8+
LOGGER = logging.getLogger(__name__)
9+
310

411
class DownloadGranulesAbstract(ABC):
12+
STAC_JSON = 'STAC_JSON'
13+
DOWNLOAD_DIR_KEY = 'DOWNLOAD_DIR'
14+
15+
def __init__(self) -> None:
16+
super().__init__()
17+
self._granules_json = []
18+
self._download_dir = '/tmp'
19+
20+
def _setup_download_dir(self):
21+
self._download_dir = os.environ.get(self.DOWNLOAD_DIR_KEY)
22+
self._download_dir = self._download_dir[:-1] if self._download_dir.endswith('/') else self._download_dir
23+
LOGGER.debug(f'creating download dir: {self._download_dir}')
24+
FileUtils.mk_dir_p(self._download_dir)
25+
return self
26+
27+
def _retrieve_stac_json(self):
28+
raw_stac_json = os.environ.get(self.STAC_JSON)
29+
try:
30+
self._granules_json = json.loads(raw_stac_json)
31+
return self
32+
except:
33+
LOGGER.debug(f'raw_stac_json is not STAC_JSON: {raw_stac_json}. trying to see if file exists')
34+
if not FileUtils.file_exist(raw_stac_json):
35+
raise ValueError(f'missing file or not JSON: {raw_stac_json}')
36+
self._granules_json = FileUtils.read_json(raw_stac_json)
37+
if self._granules_json is None:
38+
raise ValueError(f'{raw_stac_json} is not JSON')
39+
return self
40+
541
@abstractmethod
642
def download(self, **kwargs) -> list:
743
raise NotImplementedError()

cumulus_lambda_functions/stage_in_out/download_granules_daac.py

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,30 +6,21 @@
66
import logging
77
import os
88

9-
from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3
10-
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils
11-
129
LOGGER = logging.getLogger(__name__)
1310

1411

1512
class DownloadGranulesDAAC(DownloadGranulesAbstract):
16-
DOWNLOAD_DIR_KEY = 'DOWNLOAD_DIR'
17-
STAC_JSON = 'STAC_JSON'
1813

1914
def __init__(self) -> None:
2015
super().__init__()
21-
self.__download_dir = '/tmp'
22-
self.__s3 = AwsS3()
23-
self.__granules_json = []
2416
self.__edl_token = None
2517

2618
def __set_props_from_env(self):
2719
missing_keys = [k for k in [self.STAC_JSON, self.DOWNLOAD_DIR_KEY] if k not in os.environ]
2820
if len(missing_keys) > 0:
2921
raise ValueError(f'missing environment keys: {missing_keys}')
30-
self.__granules_json = json.loads(os.environ.get(self.STAC_JSON))
31-
self.__download_dir = os.environ.get(self.DOWNLOAD_DIR_KEY)
32-
self.__download_dir = self.__download_dir[:-1] if self.__download_dir.endswith('/') else self.__download_dir
22+
self._retrieve_stac_json()
23+
self._setup_download_dir()
3324
self.__edl_token = URSTokenRetriever().start()
3425
return self
3526

@@ -79,7 +70,7 @@ def __download_one_granule(self, assets: dict):
7970
if r.status_code >= 400:
8071
raise RuntimeError(f'wrong response status: {r.status_code}. details: {r.content}')
8172
# TODO. how to correctly check redirecting to login page
82-
with open(os.path.join(self.__download_dir, os.path.basename(v["href"])), 'wb') as fd:
73+
with open(os.path.join(self._download_dir, os.path.basename(v["href"])), 'wb') as fd:
8374
fd.write(r.content)
8475
except Exception as e:
8576
LOGGER.exception(f'failed to download {v}')
@@ -89,15 +80,14 @@ def __download_one_granule(self, assets: dict):
8980

9081
def download(self, **kwargs) -> list:
9182
self.__set_props_from_env()
92-
LOGGER.debug(f'creating download dir: {self.__download_dir}')
93-
FileUtils.mk_dir_p(self.__download_dir)
94-
downloading_urls = self.__get_downloading_urls(self.__granules_json)
83+
LOGGER.debug(f'creating download dir: {self._download_dir}')
84+
downloading_urls = self.__get_downloading_urls(self._granules_json)
9585
error_list = []
9686
for each in downloading_urls:
9787
LOGGER.debug(f'working on {each}')
9888
current_error_list = self.__download_one_granule(each)
9989
error_list.extend(current_error_list)
10090
if len(error_list) > 0:
101-
with open(f'{self.__download_dir}/error.log', 'w') as error_file:
91+
with open(f'{self._download_dir}/error.log', 'w') as error_file:
10292
error_file.write(json.dumps(error_list, indent=4))
10393
return downloading_urls

cumulus_lambda_functions/stage_in_out/download_granules_s3.py

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,42 +4,22 @@
44
import os
55

66
from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3
7-
from cumulus_lambda_functions.lib.utils.file_utils import FileUtils
87

98
LOGGER = logging.getLogger(__name__)
109

1110

1211
class DownloadGranulesS3(DownloadGranulesAbstract):
13-
DOWNLOAD_DIR_KEY = 'DOWNLOAD_DIR'
14-
STAC_JSON = 'STAC_JSON'
1512

1613
def __init__(self) -> None:
1714
super().__init__()
18-
self.__download_dir = '/tmp'
1915
self.__s3 = AwsS3()
20-
self.__granules_json = []
21-
22-
def __retrieve_stac_json(self):
23-
raw_stac_json = os.environ.get(self.STAC_JSON)
24-
try:
25-
self.__granules_json = json.loads(raw_stac_json)
26-
return self
27-
except:
28-
LOGGER.debug(f'raw_stac_json is not STAC_JSON: {raw_stac_json}. trying to see if file exists')
29-
if not FileUtils.file_exist(raw_stac_json):
30-
raise ValueError(f'missing file or not JSON: {raw_stac_json}')
31-
self.__granules_json = FileUtils.read_json(raw_stac_json)
32-
if self.__granules_json is None:
33-
raise ValueError(f'{raw_stac_json} is not JSON')
34-
return self
3516

3617
def __set_props_from_env(self):
3718
missing_keys = [k for k in [self.STAC_JSON, self.DOWNLOAD_DIR_KEY] if k not in os.environ]
3819
if len(missing_keys) > 0:
3920
raise ValueError(f'missing environment keys: {missing_keys}')
40-
self.__retrieve_stac_json()
41-
self.__download_dir = os.environ.get(self.DOWNLOAD_DIR_KEY)
42-
self.__download_dir = self.__download_dir[:-1] if self.__download_dir.endswith('/') else self.__download_dir
21+
self._retrieve_stac_json()
22+
self._setup_download_dir()
4323
return self
4424

4525
def __get_downloading_urls(self, granules_result: list):
@@ -81,7 +61,7 @@ def __download_one_granule(self, assets: dict):
8161
for k, v in assets.items():
8262
try:
8363
LOGGER.debug(f'downloading: {v["href"]}')
84-
self.__s3.set_s3_url(v['href']).download(self.__download_dir)
64+
self.__s3.set_s3_url(v['href']).download(self._download_dir)
8565
except Exception as e:
8666
LOGGER.exception(f'failed to download {v}')
8767
v['cause'] = str(e)
@@ -90,15 +70,13 @@ def __download_one_granule(self, assets: dict):
9070

9171
def download(self, **kwargs) -> list:
9272
self.__set_props_from_env()
93-
LOGGER.debug(f'creating download dir: {self.__download_dir}')
94-
FileUtils.mk_dir_p(self.__download_dir)
95-
downloading_urls = self.__get_downloading_urls(self.__granules_json)
73+
downloading_urls = self.__get_downloading_urls(self._granules_json)
9674
error_list = []
9775
for each in downloading_urls:
9876
LOGGER.debug(f'working on {each}')
9977
current_error_list = self.__download_one_granule(each)
10078
error_list.extend(current_error_list)
10179
if len(error_list) > 0:
102-
with open(f'{self.__download_dir}/error.log', 'w') as error_file:
80+
with open(f'{self._download_dir}/error.log', 'w') as error_file:
10381
error_file.write(json.dumps(error_list, indent=4))
10482
return downloading_urls
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import json
2+
3+
from cumulus_lambda_functions.cumulus_stac.granules_catalog import GranulesCatalog
4+
from cumulus_lambda_functions.stage_in_out.search_collections_factory import SearchCollectionsFactory
5+
from cumulus_lambda_functions.stage_in_out.upload_granules_abstract import UploadGranulesAbstract
6+
import logging
7+
import os
8+
import re
9+
from cumulus_lambda_functions.lib.aws.aws_s3 import AwsS3
10+
11+
LOGGER = logging.getLogger(__name__)
12+
13+
14+
class UploadGranulesByCatalogS3(UploadGranulesAbstract):
15+
CATALOG_FILE = 'CATALOG_FILE'
16+
COLLECTION_ID_KEY = 'COLLECTION_ID'
17+
STAGING_BUCKET_KEY = 'STAGING_BUCKET'
18+
GRANULES_SEARCH_DOMAIN = 'GRANULES_SEARCH_DOMAIN'
19+
20+
VERIFY_SSL_KEY = 'VERIFY_SSL'
21+
DELETE_FILES_KEY = 'DELETE_FILES'
22+
23+
def __init__(self) -> None:
24+
super().__init__()
25+
self.__gc = GranulesCatalog()
26+
self.__collection_id = ''
27+
self.__collection_details = {}
28+
self.__staging_bucket = ''
29+
self.__verify_ssl = True
30+
self.__delete_files = False
31+
self.__s3 = AwsS3()
32+
33+
def __set_props_from_env(self):
34+
missing_keys = [k for k in [self.CATALOG_FILE, self.COLLECTION_ID_KEY, self.GRANULES_SEARCH_DOMAIN, self.STAGING_BUCKET_KEY] if k not in os.environ]
35+
if len(missing_keys) > 0:
36+
raise ValueError(f'missing environment keys: {missing_keys}')
37+
38+
self.__collection_id = os.environ.get(self.COLLECTION_ID_KEY)
39+
self.__staging_bucket = os.environ.get(self.STAGING_BUCKET_KEY)
40+
41+
self.__verify_ssl = os.environ.get(self.VERIFY_SSL_KEY, 'TRUE').strip().upper() == 'TRUE'
42+
self.__delete_files = os.environ.get(self.DELETE_FILES_KEY, 'FALSE').strip().upper() == 'TRUE'
43+
return self
44+
45+
def upload(self, **kwargs) -> list:
46+
self.__set_props_from_env()
47+
self.__collection_details = SearchCollectionsFactory().get_class(os.getenv('GRANULES_SEARCH_DOMAIN', 'MISSING_GRANULES_SEARCH_DOMAIN')).search()
48+
self.__collection_details = json.loads(self.__collection_details)
49+
50+
granule_id_extraction = self.__collection_details['summaries']['granuleIdExtraction'][0]
51+
child_links = self.__gc.get_child_link_hrefs(os.environ.get(self.CATALOG_FILE))
52+
errors = []
53+
dapa_body_granules = []
54+
for each_child in child_links:
55+
try:
56+
current_granule_stac = self.__gc.get_granules_item(each_child)
57+
current_assets = self.__gc.extract_assets_href(current_granule_stac)
58+
if 'data' not in current_assets:
59+
LOGGER.warning(f'skipping {each_child}. no data in {current_assets}')
60+
continue
61+
62+
current_granule_id = re.findall(granule_id_extraction, os.path.basename(current_assets['data']))
63+
if len(current_granule_id) < 1:
64+
LOGGER.warning(f'skipping {each_child}. cannot be matched to granule_id: {current_granule_id}')
65+
continue
66+
current_granule_id = current_granule_id[0]
67+
68+
updating_assets = {}
69+
uploading_current_granule_stac = None
70+
for asset_type, asset_href in current_assets.items():
71+
72+
LOGGER.debug(f'uploading {asset_type}, {asset_href}')
73+
s3_url = self.__s3.upload(asset_href, self.__staging_bucket, f'{self.__collection_id}:{current_granule_id}', self.__delete_files)
74+
if asset_href == each_child:
75+
uploading_current_granule_stac = s3_url
76+
updating_assets[asset_type] = s3_url
77+
self.__gc.update_assets_href(current_granule_stac, updating_assets)
78+
current_granule_stac.id = current_granule_id
79+
current_granule_stac.collection_id = self.__collection_id
80+
if uploading_current_granule_stac is not None: # upload metadata file again
81+
self.__s3.set_s3_url(uploading_current_granule_stac)
82+
self.__s3.upload_bytes(json.dumps(current_granule_stac.to_dict(False, False)).encode())
83+
dapa_body_granules.append({
84+
'id': f'{self.__collection_id}:{current_granule_id}',
85+
'collection': self.__collection_id,
86+
'assets': {k: v.to_dict() for k, v in current_granule_stac.assets.items()},
87+
})
88+
except Exception as e:
89+
LOGGER.exception(f'error while processing: {each_child}')
90+
errors.append({'href': each_child, 'error': str(e)})
91+
92+
if len(errors) > 0:
93+
LOGGER.error(f'some errors while uploading granules: {errors}')
94+
LOGGER.debug(f'dapa_body_granules: {dapa_body_granules}')
95+
return dapa_body_granules
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
2-
31
class UploadGranulesFactory:
42
S3 = 'S3'
3+
CATALOG_S3 = 'CATALOG_S3'
54

65
def get_class(self, upload_type):
76
if upload_type == UploadGranulesFactory.S3:
87
from cumulus_lambda_functions.stage_in_out.upload_granules_s3 import UploadGranulesS3
98
return UploadGranulesS3()
9+
if upload_type == UploadGranulesFactory.CATALOG_S3:
10+
from cumulus_lambda_functions.stage_in_out.upload_granules_by_catalog_s3 import UploadGranulesByCatalogS3
11+
return UploadGranulesByCatalogS3()
1012
raise ValueError(f'unknown search_type: {upload_type}')

docker/stage-in-stage-out/dc-003-upload.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ services:
2828
DELETE_FILES: 'FALSE'
2929

3030
GRANULES_SEARCH_DOMAIN: 'UNITY'
31-
GRANULES_UPLOAD_TYPE: 'S3'
32-
UPLOAD_DIR: '/etc/snpp_upload_test_1'
31+
GRANULES_UPLOAD_TYPE: 'S3 or CATALOG_S3'
32+
UPLOAD_DIR: '/etc/snpp_upload_test_1. or empty string'
33+
CATALOG_FILE: 'empty string or /path/to/stac/catalog file'
3334
LOG_LEVEL: '20'
3435
networks:
3536
- internal

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
setup(
2020
name="cumulus_lambda_functions",
21-
version="3.4.0",
21+
version="3.6.0",
2222
packages=find_packages(),
2323
install_requires=install_requires,
2424
tests_require=['mock', 'nose', 'sphinx', 'sphinx_rtd_theme', 'coverage', 'pystac', 'python-dotenv', 'jsonschema'],

0 commit comments

Comments
 (0)