Skip to content

Commit b8d6c38

Browse files
authored
Ensure consistent validation (#368)
**Related Issue(s):** - #332 **Description:** - Bulk insertion via `BulkTransactionsClient` now strictly validates all STAC Items using the Pydantic model before insertion. Any invalid item will immediately raise a `ValidationError`, ensuring consistent validation with single-item inserts and preventing invalid STAC Items from being stored. This validation is enforced regardless of the `RAISE_ON_BULK_ERROR` setting. - Refactored `create_item` and `update_item` methods to share unified logic, ensuring consistent conflict detection, validation, and database operations. **PR Checklist:** - [x] Code is formatted and linted (run `pre-commit run --all-files`) - [x] Tests pass (run `make test`) - [x] Documentation has been updated to reflect changes, if applicable - [x] Changes are added to the changelog
1 parent b4f0227 commit b8d6c38

File tree

7 files changed

+218
-134
lines changed

7 files changed

+218
-134
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1818
- Updated dynamic mapping for items to map long values to double versus float. [#326](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/326)
1919
- Extended Datetime Search to search on start_datetime and end_datetime as well as datetime fields. [#182](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/182)
2020
- Changed item update operation to use Elasticsearch index API instead of delete and create for better efficiency and atomicity. [#75](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/75)
21+
- Bulk insertion via `BulkTransactionsClient` now strictly validates all STAC Items using the Pydantic model before insertion. Any invalid item will immediately raise a `ValidationError`, ensuring consistent validation with single-item inserts and preventing invalid STAC Items from being stored. This validation is enforced regardless of the `RAISE_ON_BULK_ERROR` setting. [#368](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/368)
22+
2123

2224
### Fixed
2325

26+
- Refactored `create_item` and `update_item` methods to share unified logic, ensuring consistent conflict detection, validation, and database operations. [#368](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/368)
27+
2428
## [v4.0.0] - 2025-04-23
2529

2630
### Added

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ You can customize additional settings in your `.env` file:
114114
| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional |
115115
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional |
116116
| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional
117-
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. | `false` | Optional |
117+
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` | Optional |
118118

119119
> [!NOTE]
120120
> The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch.

stac_fastapi/core/stac_fastapi/core/core.py

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -693,19 +693,23 @@ async def create_item(
693693
NotFoundError: If the specified collection is not found in the database.
694694
ConflictError: If an item with the same ID already exists in the collection.
695695
"""
696-
item = item.model_dump(mode="json")
697-
base_url = str(kwargs["request"].base_url)
696+
request = kwargs.get("request")
697+
base_url = str(request.base_url)
698+
699+
# Convert Pydantic model to dict for uniform processing
700+
item_dict = item.model_dump(mode="json")
698701

699-
# If a feature collection is posted
700-
if item["type"] == "FeatureCollection":
702+
# Handle FeatureCollection (bulk insert)
703+
if item_dict["type"] == "FeatureCollection":
701704
bulk_client = BulkTransactionsClient(
702705
database=self.database, settings=self.settings
703706
)
707+
features = item_dict["features"]
704708
processed_items = [
705709
bulk_client.preprocess_item(
706-
item, base_url, BulkTransactionMethod.INSERT
710+
feature, base_url, BulkTransactionMethod.INSERT
707711
)
708-
for item in item["features"]
712+
for feature in features
709713
]
710714
attempted = len(processed_items)
711715
success, errors = await self.database.bulk_async(
@@ -714,17 +718,23 @@ async def create_item(
714718
refresh=kwargs.get("refresh", False),
715719
)
716720
if errors:
717-
logger.error(f"Bulk async operation encountered errors: {errors}")
721+
logger.error(
722+
f"Bulk async operation encountered errors for collection {collection_id}: {errors} (attempted {attempted})"
723+
)
718724
else:
719-
logger.info(f"Bulk async operation succeeded with {success} actions.")
720-
725+
logger.info(
726+
f"Bulk async operation succeeded with {success} actions for collection {collection_id}."
727+
)
721728
return f"Successfully added {success} Items. {attempted - success} errors occurred."
722-
else:
723-
item = await self.database.async_prep_create_item(
724-
item=item, base_url=base_url
725-
)
726-
await self.database.create_item(item, refresh=kwargs.get("refresh", False))
727-
return ItemSerializer.db_to_stac(item, base_url)
729+
730+
# Handle single item
731+
await self.database.create_item(
732+
item_dict,
733+
refresh=kwargs.get("refresh", False),
734+
base_url=base_url,
735+
exist_ok=False,
736+
)
737+
return ItemSerializer.db_to_stac(item_dict, base_url)
728738

729739
@overrides
730740
async def update_item(
@@ -750,8 +760,9 @@ async def update_item(
750760
now = datetime_type.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
751761
item["properties"]["updated"] = now
752762

753-
await self.database.check_collection_exists(collection_id)
754-
await self.database.create_item(item, refresh=kwargs.get("refresh", False))
763+
await self.database.create_item(
764+
item, refresh=kwargs.get("refresh", False), base_url=base_url, exist_ok=True
765+
)
755766

756767
return ItemSerializer.db_to_stac(item, base_url)
757768

@@ -908,12 +919,19 @@ def bulk_item_insert(
908919
else:
909920
base_url = ""
910921

911-
processed_items = [
912-
self.preprocess_item(item, base_url, items.method)
913-
for item in items.items.values()
914-
]
922+
processed_items = []
923+
for item in items.items.values():
924+
try:
925+
validated = Item(**item) if not isinstance(item, Item) else item
926+
processed_items.append(
927+
self.preprocess_item(
928+
validated.model_dump(mode="json"), base_url, items.method
929+
)
930+
)
931+
except ValidationError:
932+
# Immediately raise on the first invalid item (strict mode)
933+
raise
915934

916-
# not a great way to get the collection_id-- should be part of the method signature
917935
collection_id = processed_items[0]["collection"]
918936
attempted = len(processed_items)
919937
success, errors = self.database.bulk_sync(

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -842,7 +842,13 @@ def bulk_sync_prep_create_item(
842842
logger.debug(f"Item {item['id']} prepared successfully.")
843843
return prepped_item
844844

845-
async def create_item(self, item: Item, refresh: bool = False):
845+
async def create_item(
846+
self,
847+
item: Item,
848+
refresh: bool = False,
849+
base_url: str = "",
850+
exist_ok: bool = False,
851+
):
846852
"""Database logic for creating one item.
847853
848854
Args:
@@ -858,18 +864,16 @@ async def create_item(self, item: Item, refresh: bool = False):
858864
# todo: check if collection exists, but cache
859865
item_id = item["id"]
860866
collection_id = item["collection"]
861-
es_resp = await self.client.index(
867+
item = await self.async_prep_create_item(
868+
item=item, base_url=base_url, exist_ok=exist_ok
869+
)
870+
await self.client.index(
862871
index=index_alias_by_collection_id(collection_id),
863872
id=mk_item_id(item_id, collection_id),
864873
document=item,
865874
refresh=refresh,
866875
)
867876

868-
if (meta := es_resp.get("meta")) and meta.get("status") == 409:
869-
raise ConflictError(
870-
f"Item {item_id} in collection {collection_id} already exists"
871-
)
872-
873877
async def delete_item(
874878
self, item_id: str, collection_id: str, refresh: bool = False
875879
):

stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,13 @@ def bulk_sync_prep_create_item(
861861
logger.debug(f"Item {item['id']} prepared successfully.")
862862
return prepped_item
863863

864-
async def create_item(self, item: Item, refresh: bool = False):
864+
async def create_item(
865+
self,
866+
item: Item,
867+
refresh: bool = False,
868+
base_url: str = "",
869+
exist_ok: bool = False,
870+
):
865871
"""Database logic for creating one item.
866872
867873
Args:
@@ -877,18 +883,16 @@ async def create_item(self, item: Item, refresh: bool = False):
877883
# todo: check if collection exists, but cache
878884
item_id = item["id"]
879885
collection_id = item["collection"]
880-
es_resp = await self.client.index(
886+
item = await self.async_prep_create_item(
887+
item=item, base_url=base_url, exist_ok=exist_ok
888+
)
889+
await self.client.index(
881890
index=index_alias_by_collection_id(collection_id),
882891
id=mk_item_id(item_id, collection_id),
883892
body=item,
884893
refresh=refresh,
885894
)
886895

887-
if (meta := es_resp.get("meta")) and meta.get("status") == 409:
888-
raise ConflictError(
889-
f"Item {item_id} in collection {collection_id} already exists"
890-
)
891-
892896
async def delete_item(
893897
self, item_id: str, collection_id: str, refresh: bool = False
894898
):
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import os
2+
import uuid
3+
from copy import deepcopy
4+
5+
import pytest
6+
from pydantic import ValidationError
7+
8+
from stac_fastapi.extensions.third_party.bulk_transactions import Items
9+
from stac_fastapi.types.errors import ConflictError
10+
11+
from ..conftest import MockRequest, create_item
12+
13+
if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch":
14+
from stac_fastapi.opensearch.config import OpensearchSettings as SearchSettings
15+
else:
16+
from stac_fastapi.elasticsearch.config import (
17+
ElasticsearchSettings as SearchSettings,
18+
)
19+
20+
21+
@pytest.mark.asyncio
22+
async def test_bulk_item_insert(ctx, core_client, txn_client, bulk_txn_client):
23+
items = {}
24+
for _ in range(10):
25+
_item = deepcopy(ctx.item)
26+
_item["id"] = str(uuid.uuid4())
27+
items[_item["id"]] = _item
28+
29+
# fc = es_core.item_collection(coll["id"], request=MockStarletteRequest)
30+
# assert len(fc["features"]) == 0
31+
32+
bulk_txn_client.bulk_item_insert(Items(items=items), refresh=True)
33+
34+
fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest())
35+
assert len(fc["features"]) >= 10
36+
37+
38+
@pytest.mark.asyncio
39+
async def test_bulk_item_insert_with_raise_on_error(
40+
ctx, core_client, txn_client, bulk_txn_client
41+
):
42+
"""
43+
Test bulk_item_insert behavior with RAISE_ON_BULK_ERROR set to true and false.
44+
45+
This test verifies that when RAISE_ON_BULK_ERROR is set to true, a ConflictError
46+
is raised for conflicting items. When set to false, the operation logs errors
47+
and continues gracefully.
48+
"""
49+
50+
# Insert an initial item to set up a conflict
51+
initial_item = deepcopy(ctx.item)
52+
initial_item["id"] = str(uuid.uuid4())
53+
await create_item(txn_client, initial_item)
54+
55+
# Verify the initial item is inserted
56+
fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest())
57+
assert len(fc["features"]) >= 1
58+
59+
# Create conflicting items (same ID as the initial item)
60+
conflicting_items = {initial_item["id"]: deepcopy(initial_item)}
61+
62+
# Test with RAISE_ON_BULK_ERROR set to true
63+
os.environ["RAISE_ON_BULK_ERROR"] = "true"
64+
bulk_txn_client.database.sync_settings = SearchSettings()
65+
66+
with pytest.raises(ConflictError):
67+
bulk_txn_client.bulk_item_insert(Items(items=conflicting_items), refresh=True)
68+
69+
# Test with RAISE_ON_BULK_ERROR set to false
70+
os.environ["RAISE_ON_BULK_ERROR"] = "false"
71+
bulk_txn_client.database.sync_settings = SearchSettings() # Reinitialize settings
72+
result = bulk_txn_client.bulk_item_insert(
73+
Items(items=conflicting_items), refresh=True
74+
)
75+
76+
# Validate the results
77+
assert "Successfully added/updated 1 Items" in result
78+
79+
# Clean up the inserted item
80+
await txn_client.delete_item(initial_item["id"], ctx.item["collection"])
81+
82+
83+
@pytest.mark.asyncio
84+
async def test_feature_collection_insert(
85+
core_client,
86+
txn_client,
87+
ctx,
88+
):
89+
features = []
90+
for _ in range(10):
91+
_item = deepcopy(ctx.item)
92+
_item["id"] = str(uuid.uuid4())
93+
features.append(_item)
94+
95+
feature_collection = {"type": "FeatureCollection", "features": features}
96+
97+
await create_item(txn_client, feature_collection)
98+
99+
fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest())
100+
assert len(fc["features"]) >= 10
101+
102+
103+
@pytest.mark.asyncio
104+
async def test_bulk_item_insert_validation_error(ctx, core_client, bulk_txn_client):
105+
items = {}
106+
# Add 9 valid items
107+
for _ in range(9):
108+
_item = deepcopy(ctx.item)
109+
_item["id"] = str(uuid.uuid4())
110+
items[_item["id"]] = _item
111+
112+
# Add 1 invalid item (e.g., missing "datetime")
113+
invalid_item = deepcopy(ctx.item)
114+
invalid_item["id"] = str(uuid.uuid4())
115+
invalid_item["properties"].pop(
116+
"datetime", None
117+
) # Remove datetime to make it invalid
118+
items[invalid_item["id"]] = invalid_item
119+
120+
# The bulk insert should raise a ValidationError due to the invalid item
121+
with pytest.raises(ValidationError):
122+
bulk_txn_client.bulk_item_insert(Items(items=items), refresh=True)
123+
124+
125+
@pytest.mark.asyncio
126+
async def test_feature_collection_insert_validation_error(
127+
core_client,
128+
txn_client,
129+
ctx,
130+
):
131+
features = []
132+
# Add 9 valid items
133+
for _ in range(9):
134+
_item = deepcopy(ctx.item)
135+
_item["id"] = str(uuid.uuid4())
136+
features.append(_item)
137+
138+
# Add 1 invalid item (e.g., missing "datetime")
139+
invalid_item = deepcopy(ctx.item)
140+
invalid_item["id"] = str(uuid.uuid4())
141+
invalid_item["properties"].pop(
142+
"datetime", None
143+
) # Remove datetime to make it invalid
144+
features.append(invalid_item)
145+
146+
feature_collection = {"type": "FeatureCollection", "features": features}
147+
148+
# Assert that a ValidationError is raised due to the invalid item
149+
with pytest.raises(ValidationError):
150+
await create_item(txn_client, feature_collection)

0 commit comments

Comments
 (0)