Skip to content

Ensure consistent validation #368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Updated dynamic mapping for items to map long values to double versus float. [#326](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/326)
- Extended Datetime Search to search on start_datetime and end_datetime as well as datetime fields. [#182](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/182)
- Changed item update operation to use Elasticsearch index API instead of delete and create for better efficiency and atomicity. [#75](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/75)
- Bulk insertion via `BulkTransactionsClient` now strictly validates all STAC Items using the Pydantic model before insertion. Any invalid item will immediately raise a `ValidationError`, ensuring consistent validation with single-item inserts and preventing invalid STAC Items from being stored. This validation is enforced regardless of the `RAISE_ON_BULK_ERROR` setting. [#368](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/368)


### Fixed

- Refactored `create_item` and `update_item` methods to share unified logic, ensuring consistent conflict detection, validation, and database operations. [#368](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/368)

## [v4.0.0] - 2025-04-23

### Added
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ You can customize additional settings in your `.env` file:
| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional |
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional |
| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. | `false` | Optional |
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` | Optional |

> [!NOTE]
> The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch.
Expand Down
62 changes: 40 additions & 22 deletions stac_fastapi/core/stac_fastapi/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,19 +693,23 @@ async def create_item(
NotFoundError: If the specified collection is not found in the database.
ConflictError: If an item with the same ID already exists in the collection.
"""
item = item.model_dump(mode="json")
base_url = str(kwargs["request"].base_url)
request = kwargs.get("request")
base_url = str(request.base_url)

# Convert Pydantic model to dict for uniform processing
item_dict = item.model_dump(mode="json")

# If a feature collection is posted
if item["type"] == "FeatureCollection":
# Handle FeatureCollection (bulk insert)
if item_dict["type"] == "FeatureCollection":
bulk_client = BulkTransactionsClient(
database=self.database, settings=self.settings
)
features = item_dict["features"]
processed_items = [
bulk_client.preprocess_item(
item, base_url, BulkTransactionMethod.INSERT
feature, base_url, BulkTransactionMethod.INSERT
)
for item in item["features"]
for feature in features
]
attempted = len(processed_items)
success, errors = await self.database.bulk_async(
Expand All @@ -714,17 +718,23 @@ async def create_item(
refresh=kwargs.get("refresh", False),
)
if errors:
logger.error(f"Bulk async operation encountered errors: {errors}")
logger.error(
f"Bulk async operation encountered errors for collection {collection_id}: {errors} (attempted {attempted})"
)
else:
logger.info(f"Bulk async operation succeeded with {success} actions.")

logger.info(
f"Bulk async operation succeeded with {success} actions for collection {collection_id}."
)
return f"Successfully added {success} Items. {attempted - success} errors occurred."
else:
item = await self.database.async_prep_create_item(
item=item, base_url=base_url
)
await self.database.create_item(item, refresh=kwargs.get("refresh", False))
return ItemSerializer.db_to_stac(item, base_url)

# Handle single item
await self.database.create_item(
item_dict,
refresh=kwargs.get("refresh", False),
base_url=base_url,
exist_ok=False,
)
return ItemSerializer.db_to_stac(item_dict, base_url)

@overrides
async def update_item(
Expand All @@ -750,8 +760,9 @@ async def update_item(
now = datetime_type.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
item["properties"]["updated"] = now

await self.database.check_collection_exists(collection_id)
await self.database.create_item(item, refresh=kwargs.get("refresh", False))
await self.database.create_item(
item, refresh=kwargs.get("refresh", False), base_url=base_url, exist_ok=True
)

return ItemSerializer.db_to_stac(item, base_url)

Expand Down Expand Up @@ -908,12 +919,19 @@ def bulk_item_insert(
else:
base_url = ""

processed_items = [
self.preprocess_item(item, base_url, items.method)
for item in items.items.values()
]
processed_items = []
for item in items.items.values():
try:
validated = Item(**item) if not isinstance(item, Item) else item
processed_items.append(
self.preprocess_item(
validated.model_dump(mode="json"), base_url, items.method
)
)
except ValidationError:
# Immediately raise on the first invalid item (strict mode)
raise

# not a great way to get the collection_id-- should be part of the method signature
collection_id = processed_items[0]["collection"]
attempted = len(processed_items)
success, errors = self.database.bulk_sync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,13 @@ def bulk_sync_prep_create_item(
logger.debug(f"Item {item['id']} prepared successfully.")
return prepped_item

async def create_item(self, item: Item, refresh: bool = False):
async def create_item(
self,
item: Item,
refresh: bool = False,
base_url: str = "",
exist_ok: bool = False,
):
"""Database logic for creating one item.
Args:
Expand All @@ -858,18 +864,16 @@ async def create_item(self, item: Item, refresh: bool = False):
# todo: check if collection exists, but cache
item_id = item["id"]
collection_id = item["collection"]
es_resp = await self.client.index(
item = await self.async_prep_create_item(
item=item, base_url=base_url, exist_ok=exist_ok
)
await self.client.index(
index=index_alias_by_collection_id(collection_id),
id=mk_item_id(item_id, collection_id),
document=item,
refresh=refresh,
)

if (meta := es_resp.get("meta")) and meta.get("status") == 409:
raise ConflictError(
f"Item {item_id} in collection {collection_id} already exists"
)

async def delete_item(
self, item_id: str, collection_id: str, refresh: bool = False
):
Expand Down
18 changes: 11 additions & 7 deletions stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,13 @@ def bulk_sync_prep_create_item(
logger.debug(f"Item {item['id']} prepared successfully.")
return prepped_item

async def create_item(self, item: Item, refresh: bool = False):
async def create_item(
self,
item: Item,
refresh: bool = False,
base_url: str = "",
exist_ok: bool = False,
):
"""Database logic for creating one item.

Args:
Expand All @@ -877,18 +883,16 @@ async def create_item(self, item: Item, refresh: bool = False):
# todo: check if collection exists, but cache
item_id = item["id"]
collection_id = item["collection"]
es_resp = await self.client.index(
item = await self.async_prep_create_item(
item=item, base_url=base_url, exist_ok=exist_ok
)
await self.client.index(
index=index_alias_by_collection_id(collection_id),
id=mk_item_id(item_id, collection_id),
body=item,
refresh=refresh,
)

if (meta := es_resp.get("meta")) and meta.get("status") == 409:
raise ConflictError(
f"Item {item_id} in collection {collection_id} already exists"
)

async def delete_item(
self, item_id: str, collection_id: str, refresh: bool = False
):
Expand Down
150 changes: 150 additions & 0 deletions stac_fastapi/tests/clients/test_bulk_transactions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import os
import uuid
from copy import deepcopy

import pytest
from pydantic import ValidationError

from stac_fastapi.extensions.third_party.bulk_transactions import Items
from stac_fastapi.types.errors import ConflictError

from ..conftest import MockRequest, create_item

if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch":
from stac_fastapi.opensearch.config import OpensearchSettings as SearchSettings
else:
from stac_fastapi.elasticsearch.config import (
ElasticsearchSettings as SearchSettings,
)


@pytest.mark.asyncio
async def test_bulk_item_insert(ctx, core_client, txn_client, bulk_txn_client):
items = {}
for _ in range(10):
_item = deepcopy(ctx.item)
_item["id"] = str(uuid.uuid4())
items[_item["id"]] = _item

# fc = es_core.item_collection(coll["id"], request=MockStarletteRequest)
# assert len(fc["features"]) == 0

bulk_txn_client.bulk_item_insert(Items(items=items), refresh=True)

fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest())
assert len(fc["features"]) >= 10


@pytest.mark.asyncio
async def test_bulk_item_insert_with_raise_on_error(
ctx, core_client, txn_client, bulk_txn_client
):
"""
Test bulk_item_insert behavior with RAISE_ON_BULK_ERROR set to true and false.
This test verifies that when RAISE_ON_BULK_ERROR is set to true, a ConflictError
is raised for conflicting items. When set to false, the operation logs errors
and continues gracefully.
"""

# Insert an initial item to set up a conflict
initial_item = deepcopy(ctx.item)
initial_item["id"] = str(uuid.uuid4())
await create_item(txn_client, initial_item)

# Verify the initial item is inserted
fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest())
assert len(fc["features"]) >= 1

# Create conflicting items (same ID as the initial item)
conflicting_items = {initial_item["id"]: deepcopy(initial_item)}

# Test with RAISE_ON_BULK_ERROR set to true
os.environ["RAISE_ON_BULK_ERROR"] = "true"
bulk_txn_client.database.sync_settings = SearchSettings()

with pytest.raises(ConflictError):
bulk_txn_client.bulk_item_insert(Items(items=conflicting_items), refresh=True)

# Test with RAISE_ON_BULK_ERROR set to false
os.environ["RAISE_ON_BULK_ERROR"] = "false"
bulk_txn_client.database.sync_settings = SearchSettings() # Reinitialize settings
result = bulk_txn_client.bulk_item_insert(
Items(items=conflicting_items), refresh=True
)

# Validate the results
assert "Successfully added/updated 1 Items" in result

# Clean up the inserted item
await txn_client.delete_item(initial_item["id"], ctx.item["collection"])


@pytest.mark.asyncio
async def test_feature_collection_insert(
core_client,
txn_client,
ctx,
):
features = []
for _ in range(10):
_item = deepcopy(ctx.item)
_item["id"] = str(uuid.uuid4())
features.append(_item)

feature_collection = {"type": "FeatureCollection", "features": features}

await create_item(txn_client, feature_collection)

fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest())
assert len(fc["features"]) >= 10


@pytest.mark.asyncio
async def test_bulk_item_insert_validation_error(ctx, core_client, bulk_txn_client):
items = {}
# Add 9 valid items
for _ in range(9):
_item = deepcopy(ctx.item)
_item["id"] = str(uuid.uuid4())
items[_item["id"]] = _item

# Add 1 invalid item (e.g., missing "datetime")
invalid_item = deepcopy(ctx.item)
invalid_item["id"] = str(uuid.uuid4())
invalid_item["properties"].pop(
"datetime", None
) # Remove datetime to make it invalid
items[invalid_item["id"]] = invalid_item

# The bulk insert should raise a ValidationError due to the invalid item
with pytest.raises(ValidationError):
bulk_txn_client.bulk_item_insert(Items(items=items), refresh=True)


@pytest.mark.asyncio
async def test_feature_collection_insert_validation_error(
core_client,
txn_client,
ctx,
):
features = []
# Add 9 valid items
for _ in range(9):
_item = deepcopy(ctx.item)
_item["id"] = str(uuid.uuid4())
features.append(_item)

# Add 1 invalid item (e.g., missing "datetime")
invalid_item = deepcopy(ctx.item)
invalid_item["id"] = str(uuid.uuid4())
invalid_item["properties"].pop(
"datetime", None
) # Remove datetime to make it invalid
features.append(invalid_item)

feature_collection = {"type": "FeatureCollection", "features": features}

# Assert that a ValidationError is raised due to the invalid item
with pytest.raises(ValidationError):
await create_item(txn_client, feature_collection)
Loading