Skip to content

Commit b4f0227

Browse files
authored
Enable choice of raising bulk insert errors, logging (#364)
**Related Issue(s):** - #346 **Description:** - Added logging to bulk insertion methods to provide detailed feedback on errors encountered during operations. - Introduced the `RAISE_ON_BULK_ERROR` environment variable to control whether bulk insertion methods raise exceptions on errors (`true`) or log warnings and continue processing (`false`). **PR Checklist:** - [x] Code is formatted and linted (run `pre-commit run --all-files`) - [x] Tests pass (run `make test`) - [x] Documentation has been updated to reflect changes, if applicable - [x] Changes are added to the changelog
1 parent 70a18f5 commit b4f0227

File tree

9 files changed

+392
-131
lines changed

9 files changed

+392
-131
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
99

1010
### Added
1111

12+
- Added logging to bulk insertion methods to provide detailed feedback on errors encountered during operations. [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364)
13+
- Introduced the `RAISE_ON_BULK_ERROR` environment variable to control whether bulk insertion methods raise exceptions on errors (`true`) or log warnings and continue processing (`false`). [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364)
1214
- Added code coverage reporting to the test suite using pytest-cov. [#87](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/87)
1315

1416
### Changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ You can customize additional settings in your `.env` file:
113113
| `BACKEND` | Tests-related variable | `elasticsearch` or `opensearch` based on the backend | Optional |
114114
| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional |
115115
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional |
116-
| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional |
116+
| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional
117+
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. | `false` | Optional |
117118

118119
> [!NOTE]
119120
> The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch.

stac_fastapi/core/stac_fastapi/core/core.py

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -676,21 +676,22 @@ class TransactionsClient(AsyncBaseTransactionsClient):
676676
@overrides
677677
async def create_item(
678678
self, collection_id: str, item: Union[Item, ItemCollection], **kwargs
679-
) -> Optional[stac_types.Item]:
680-
"""Create an item in the collection.
679+
) -> Union[stac_types.Item, str]:
680+
"""
681+
Create an item or a feature collection of items in the specified collection.
681682
682683
Args:
683-
collection_id (str): The id of the collection to add the item to.
684-
item (stac_types.Item): The item to be added to the collection.
685-
kwargs: Additional keyword arguments.
684+
collection_id (str): The ID of the collection to add the item(s) to.
685+
item (Union[Item, ItemCollection]): A single item or a collection of items to be added.
686+
**kwargs: Additional keyword arguments, such as `request` and `refresh`.
686687
687688
Returns:
688-
stac_types.Item: The created item.
689+
Union[stac_types.Item, str]: The created item if a single item is added, or a summary string
690+
indicating the number of items successfully added and errors if a collection of items is added.
689691
690692
Raises:
691-
NotFound: If the specified collection is not found in the database.
692-
ConflictError: If the item in the specified collection already exists.
693-
693+
NotFoundError: If the specified collection is not found in the database.
694+
ConflictError: If an item with the same ID already exists in the collection.
694695
"""
695696
item = item.model_dump(mode="json")
696697
base_url = str(kwargs["request"].base_url)
@@ -706,14 +707,22 @@ async def create_item(
706707
)
707708
for item in item["features"]
708709
]
709-
710-
await self.database.bulk_async(
711-
collection_id, processed_items, refresh=kwargs.get("refresh", False)
710+
attempted = len(processed_items)
711+
success, errors = await self.database.bulk_async(
712+
collection_id,
713+
processed_items,
714+
refresh=kwargs.get("refresh", False),
712715
)
716+
if errors:
717+
logger.error(f"Bulk async operation encountered errors: {errors}")
718+
else:
719+
logger.info(f"Bulk async operation succeeded with {success} actions.")
713720

714-
return None
721+
return f"Successfully added {success} Items. {attempted - success} errors occurred."
715722
else:
716-
item = await self.database.prep_create_item(item=item, base_url=base_url)
723+
item = await self.database.async_prep_create_item(
724+
item=item, base_url=base_url
725+
)
717726
await self.database.create_item(item, refresh=kwargs.get("refresh", False))
718727
return ItemSerializer.db_to_stac(item, base_url)
719728

@@ -875,7 +884,7 @@ def preprocess_item(
875884
The preprocessed item.
876885
"""
877886
exist_ok = method == BulkTransactionMethod.UPSERT
878-
return self.database.sync_prep_create_item(
887+
return self.database.bulk_sync_prep_create_item(
879888
item=item, base_url=base_url, exist_ok=exist_ok
880889
)
881890

@@ -906,12 +915,18 @@ def bulk_item_insert(
906915

907916
# not a great way to get the collection_id-- should be part of the method signature
908917
collection_id = processed_items[0]["collection"]
909-
910-
self.database.bulk_sync(
911-
collection_id, processed_items, refresh=kwargs.get("refresh", False)
918+
attempted = len(processed_items)
919+
success, errors = self.database.bulk_sync(
920+
collection_id,
921+
processed_items,
922+
refresh=kwargs.get("refresh", False),
912923
)
924+
if errors:
925+
logger.error(f"Bulk sync operation encountered errors: {errors}")
926+
else:
927+
logger.info(f"Bulk sync operation succeeded with {success} actions.")
913928

914-
return f"Successfully added {len(processed_items)} Items."
929+
return f"Successfully added/updated {success} Items. {attempted - success} errors occurred."
915930

916931

917932
_DEFAULT_QUERYABLES: Dict[str, Dict[str, Any]] = {

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class ElasticsearchSettings(ApiSettings, ApiBaseSettings):
8686
indexed_fields: Set[str] = {"datetime"}
8787
enable_response_models: bool = False
8888
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
89+
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)
8990

9091
@property
9192
def create_client(self):
@@ -106,6 +107,7 @@ class AsyncElasticsearchSettings(ApiSettings, ApiBaseSettings):
106107
indexed_fields: Set[str] = {"datetime"}
107108
enable_response_models: bool = False
108109
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
110+
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)
109111

110112
@property
111113
def create_client(self):

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py

Lines changed: 137 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,20 @@ async def delete_item_index(collection_id: str):
128128
class DatabaseLogic(BaseDatabaseLogic):
129129
"""Database logic."""
130130

131-
client = AsyncElasticsearchSettings().create_client
132-
sync_client = SyncElasticsearchSettings().create_client
131+
async_settings: AsyncElasticsearchSettings = attr.ib(
132+
factory=AsyncElasticsearchSettings
133+
)
134+
sync_settings: SyncElasticsearchSettings = attr.ib(
135+
factory=SyncElasticsearchSettings
136+
)
137+
138+
client = attr.ib(init=False)
139+
sync_client = attr.ib(init=False)
140+
141+
def __attrs_post_init__(self):
142+
"""Initialize clients after the class is instantiated."""
143+
self.client = self.async_settings.create_client
144+
self.sync_client = self.sync_settings.create_client
133145

134146
item_serializer: Type[ItemSerializer] = attr.ib(default=ItemSerializer)
135147
collection_serializer: Type[CollectionSerializer] = attr.ib(
@@ -699,7 +711,7 @@ async def check_collection_exists(self, collection_id: str):
699711
if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id):
700712
raise NotFoundError(f"Collection {collection_id} does not exist")
701713

702-
async def prep_create_item(
714+
async def async_prep_create_item(
703715
self, item: Item, base_url: str, exist_ok: bool = False
704716
) -> Item:
705717
"""
@@ -729,42 +741,106 @@ async def prep_create_item(
729741

730742
return self.item_serializer.stac_to_db(item, base_url)
731743

732-
def sync_prep_create_item(
744+
async def bulk_async_prep_create_item(
733745
self, item: Item, base_url: str, exist_ok: bool = False
734746
) -> Item:
735747
"""
736748
Prepare an item for insertion into the database.
737749
738-
This method performs pre-insertion preparation on the given `item`,
739-
such as checking if the collection the item belongs to exists,
740-
and optionally verifying that an item with the same ID does not already exist in the database.
750+
This method performs pre-insertion preparation on the given `item`, such as:
751+
- Verifying that the collection the item belongs to exists.
752+
- Optionally checking if an item with the same ID already exists in the database.
753+
- Serializing the item into a database-compatible format.
741754
742755
Args:
743-
item (Item): The item to be inserted into the database.
744-
base_url (str): The base URL used for constructing URLs for the item.
745-
exist_ok (bool): Indicates whether the item can exist already.
756+
item (Item): The item to be prepared for insertion.
757+
base_url (str): The base URL used to construct the item's self URL.
758+
exist_ok (bool): Indicates whether the item can already exist in the database.
759+
If False, a `ConflictError` is raised if the item exists.
746760
747761
Returns:
748-
Item: The item after preparation is done.
762+
Item: The prepared item, serialized into a database-compatible format.
749763
750764
Raises:
751765
NotFoundError: If the collection that the item belongs to does not exist in the database.
752-
ConflictError: If an item with the same ID already exists in the collection.
766+
ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False,
767+
and `RAISE_ON_BULK_ERROR` is set to `true`.
753768
"""
754-
item_id = item["id"]
755-
collection_id = item["collection"]
756-
if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=collection_id):
757-
raise NotFoundError(f"Collection {collection_id} does not exist")
769+
logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.")
758770

759-
if not exist_ok and self.sync_client.exists(
760-
index=index_alias_by_collection_id(collection_id),
761-
id=mk_item_id(item_id, collection_id),
771+
# Check if the collection exists
772+
await self.check_collection_exists(collection_id=item["collection"])
773+
774+
# Check if the item already exists in the database
775+
if not exist_ok and await self.client.exists(
776+
index=index_alias_by_collection_id(item["collection"]),
777+
id=mk_item_id(item["id"], item["collection"]),
762778
):
763-
raise ConflictError(
764-
f"Item {item_id} in collection {collection_id} already exists"
779+
error_message = (
780+
f"Item {item['id']} in collection {item['collection']} already exists."
765781
)
782+
if self.async_settings.raise_on_bulk_error:
783+
raise ConflictError(error_message)
784+
else:
785+
logger.warning(
786+
f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false."
787+
)
788+
789+
# Serialize the item into a database-compatible format
790+
prepped_item = self.item_serializer.stac_to_db(item, base_url)
791+
logger.debug(f"Item {item['id']} prepared successfully.")
792+
return prepped_item
793+
794+
def bulk_sync_prep_create_item(
795+
self, item: Item, base_url: str, exist_ok: bool = False
796+
) -> Item:
797+
"""
798+
Prepare an item for insertion into the database.
766799
767-
return self.item_serializer.stac_to_db(item, base_url)
800+
This method performs pre-insertion preparation on the given `item`, such as:
801+
- Verifying that the collection the item belongs to exists.
802+
- Optionally checking if an item with the same ID already exists in the database.
803+
- Serializing the item into a database-compatible format.
804+
805+
Args:
806+
item (Item): The item to be prepared for insertion.
807+
base_url (str): The base URL used to construct the item's self URL.
808+
exist_ok (bool): Indicates whether the item can already exist in the database.
809+
If False, a `ConflictError` is raised if the item exists.
810+
811+
Returns:
812+
Item: The prepared item, serialized into a database-compatible format.
813+
814+
Raises:
815+
NotFoundError: If the collection that the item belongs to does not exist in the database.
816+
ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False,
817+
and `RAISE_ON_BULK_ERROR` is set to `true`.
818+
"""
819+
logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.")
820+
821+
# Check if the collection exists
822+
if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=item["collection"]):
823+
raise NotFoundError(f"Collection {item['collection']} does not exist")
824+
825+
# Check if the item already exists in the database
826+
if not exist_ok and self.sync_client.exists(
827+
index=index_alias_by_collection_id(item["collection"]),
828+
id=mk_item_id(item["id"], item["collection"]),
829+
):
830+
error_message = (
831+
f"Item {item['id']} in collection {item['collection']} already exists."
832+
)
833+
if self.sync_settings.raise_on_bulk_error:
834+
raise ConflictError(error_message)
835+
else:
836+
logger.warning(
837+
f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false."
838+
)
839+
840+
# Serialize the item into a database-compatible format
841+
prepped_item = self.item_serializer.stac_to_db(item, base_url)
842+
logger.debug(f"Item {item['id']} prepared successfully.")
843+
return prepped_item
768844

769845
async def create_item(self, item: Item, refresh: bool = False):
770846
"""Database logic for creating one item.
@@ -959,52 +1035,72 @@ async def delete_collection(self, collection_id: str, refresh: bool = False):
9591035
await delete_item_index(collection_id)
9601036

9611037
async def bulk_async(
962-
self, collection_id: str, processed_items: List[Item], refresh: bool = False
963-
) -> None:
964-
"""Perform a bulk insert of items into the database asynchronously.
1038+
self,
1039+
collection_id: str,
1040+
processed_items: List[Item],
1041+
refresh: bool = False,
1042+
) -> Tuple[int, List[Dict[str, Any]]]:
1043+
"""
1044+
Perform a bulk insert of items into the database asynchronously.
9651045
9661046
Args:
967-
self: The instance of the object calling this function.
9681047
collection_id (str): The ID of the collection to which the items belong.
9691048
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
9701049
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
9711050
1051+
Returns:
1052+
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
1053+
- The number of successfully processed actions (`success`).
1054+
- A list of errors encountered during the bulk operation (`errors`).
1055+
9721056
Notes:
973-
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The
974-
insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. The
975-
`mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the
976-
index is refreshed after the bulk insert. The function does not return any value.
1057+
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`.
1058+
The insert is performed asynchronously, and the event loop is used to run the operation in a separate executor.
1059+
The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True,
1060+
the index is refreshed after the bulk insert.
9771061
"""
978-
await helpers.async_bulk(
1062+
raise_on_error = self.async_settings.raise_on_bulk_error
1063+
success, errors = await helpers.async_bulk(
9791064
self.client,
9801065
mk_actions(collection_id, processed_items),
9811066
refresh=refresh,
982-
raise_on_error=False,
1067+
raise_on_error=raise_on_error,
9831068
)
1069+
return success, errors
9841070

9851071
def bulk_sync(
986-
self, collection_id: str, processed_items: List[Item], refresh: bool = False
987-
) -> None:
988-
"""Perform a bulk insert of items into the database synchronously.
1072+
self,
1073+
collection_id: str,
1074+
processed_items: List[Item],
1075+
refresh: bool = False,
1076+
) -> Tuple[int, List[Dict[str, Any]]]:
1077+
"""
1078+
Perform a bulk insert of items into the database synchronously.
9891079
9901080
Args:
991-
self: The instance of the object calling this function.
9921081
collection_id (str): The ID of the collection to which the items belong.
9931082
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
9941083
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
9951084
1085+
Returns:
1086+
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
1087+
- The number of successfully processed actions (`success`).
1088+
- A list of errors encountered during the bulk operation (`errors`).
1089+
9961090
Notes:
997-
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The
998-
insert is performed synchronously and blocking, meaning that the function does not return until the insert has
1091+
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`.
1092+
The insert is performed synchronously and blocking, meaning that the function does not return until the insert has
9991093
completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to
1000-
True, the index is refreshed after the bulk insert. The function does not return any value.
1094+
True, the index is refreshed after the bulk insert.
10011095
"""
1002-
helpers.bulk(
1096+
raise_on_error = self.sync_settings.raise_on_bulk_error
1097+
success, errors = helpers.bulk(
10031098
self.sync_client,
10041099
mk_actions(collection_id, processed_items),
10051100
refresh=refresh,
1006-
raise_on_error=False,
1101+
raise_on_error=raise_on_error,
10071102
)
1103+
return success, errors
10081104

10091105
# DANGER
10101106
async def delete_items(self) -> None:

0 commit comments

Comments
 (0)