Skip to content

Add es_os_refresh env var to refresh index, ensure refresh passed via kwargs #370

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 10, 2025
Merged
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Added

- Added configurable landing page ID `STAC_FASTAPI_LANDING_PAGE_ID` [#352](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/352)
- Introduced the `DATABASE_REFRESH` environment variable to control whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370)

### Changed

- Refactored CRUD methods in `TransactionsClient` to use the `validate_refresh` helper method for consistent and reusable handling of the `refresh` parameter. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370)

### Fixed

- Fixed an issue where some routes were not passing the `refresh` parameter from `kwargs` to the database logic, ensuring consistent behavior across all CRUD operations. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370)

## [v4.1.0] - 2025-05-09

### Added
Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ You can customize additional settings in your `.env` file:
| `RELOAD` | Enable auto-reload for development. | `true` | Optional |
| `STAC_FASTAPI_RATE_LIMIT` | API rate limit per client. | `200/minute` | Optional |
| `BACKEND` | Tests-related variable | `elasticsearch` or `opensearch` based on the backend | Optional |
| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional |
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional |
| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional | |
| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` | Optional |
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` Optional |
| `DATABASE_REFRESH` | Controls whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. | `false` | Optional |

> [!NOTE]
> The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch.
Expand Down
28 changes: 15 additions & 13 deletions stac_fastapi/core/stac_fastapi/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,10 +712,11 @@ async def create_item(
for feature in features
]
attempted = len(processed_items)

success, errors = await self.database.bulk_async(
collection_id,
processed_items,
refresh=kwargs.get("refresh", False),
collection_id=collection_id,
processed_items=processed_items,
**kwargs,
)
if errors:
logger.error(
Expand All @@ -729,10 +730,7 @@ async def create_item(

# Handle single item
await self.database.create_item(
item_dict,
refresh=kwargs.get("refresh", False),
base_url=base_url,
exist_ok=False,
item_dict, base_url=base_url, exist_ok=False, **kwargs
)
return ItemSerializer.db_to_stac(item_dict, base_url)

Expand All @@ -757,11 +755,12 @@ async def update_item(
"""
item = item.model_dump(mode="json")
base_url = str(kwargs["request"].base_url)

now = datetime_type.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
item["properties"]["updated"] = now

await self.database.create_item(
item, refresh=kwargs.get("refresh", False), base_url=base_url, exist_ok=True
item, base_url=base_url, exist_ok=True, **kwargs
)

return ItemSerializer.db_to_stac(item, base_url)
Expand All @@ -777,7 +776,9 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs) -> None:
Returns:
None: Returns 204 No Content on successful deletion
"""
await self.database.delete_item(item_id=item_id, collection_id=collection_id)
await self.database.delete_item(
item_id=item_id, collection_id=collection_id, **kwargs
)
return None

@overrides
Expand All @@ -798,8 +799,9 @@ async def create_collection(
"""
collection = collection.model_dump(mode="json")
request = kwargs["request"]

collection = self.database.collection_serializer.stac_to_db(collection, request)
await self.database.create_collection(collection=collection)
await self.database.create_collection(collection=collection, **kwargs)
return CollectionSerializer.db_to_stac(
collection,
request,
Expand Down Expand Up @@ -835,7 +837,7 @@ async def update_collection(

collection = self.database.collection_serializer.stac_to_db(collection, request)
await self.database.update_collection(
collection_id=collection_id, collection=collection
collection_id=collection_id, collection=collection, **kwargs
)

return CollectionSerializer.db_to_stac(
Expand All @@ -860,7 +862,7 @@ async def delete_collection(self, collection_id: str, **kwargs) -> None:
Raises:
NotFoundError: If the collection doesn't exist
"""
await self.database.delete_collection(collection_id=collection_id)
await self.database.delete_collection(collection_id=collection_id, **kwargs)
return None


Expand Down Expand Up @@ -937,7 +939,7 @@ def bulk_item_insert(
success, errors = self.database.bulk_sync(
collection_id,
processed_items,
refresh=kwargs.get("refresh", False),
**kwargs,
)
if errors:
logger.error(f"Bulk sync operation encountered errors: {errors}")
Expand Down
65 changes: 60 additions & 5 deletions stac_fastapi/core/stac_fastapi/core/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,75 @@
MAX_LIMIT = 10000


def get_bool_env(name: str, default: bool = False) -> bool:
def validate_refresh(value: Union[str, bool]) -> str:
"""
Validate the `refresh` parameter value.

Args:
value (Union[str, bool]): The `refresh` parameter value, which can be a string or a boolean.

Returns:
str: The validated value of the `refresh` parameter, which can be "true", "false", or "wait_for".
"""
logger = logging.getLogger(__name__)

# Handle boolean-like values using get_bool_env
if isinstance(value, bool) or value in {
"true",
"false",
"1",
"0",
"yes",
"no",
"y",
"n",
}:
is_true = get_bool_env("DATABASE_REFRESH", default=value)
return "true" if is_true else "false"

# Normalize to lowercase for case-insensitivity
value = value.lower()

# Handle "wait_for" explicitly
if value == "wait_for":
return "wait_for"

# Log a warning for invalid values and default to "false"
logger.warning(
f"Invalid value for `refresh`: '{value}'. Expected 'true', 'false', or 'wait_for'. Defaulting to 'false'."
)
return "false"


def get_bool_env(name: str, default: Union[bool, str] = False) -> bool:
"""
Retrieve a boolean value from an environment variable.

Args:
name (str): The name of the environment variable.
default (bool, optional): The default value to use if the variable is not set or unrecognized. Defaults to False.
default (Union[bool, str], optional): The default value to use if the variable is not set or unrecognized. Defaults to False.

Returns:
bool: The boolean value parsed from the environment variable.
"""
value = os.getenv(name, str(default).lower())
true_values = ("true", "1", "yes", "y")
false_values = ("false", "0", "no", "n")

# Normalize the default value
if isinstance(default, bool):
default_str = "true" if default else "false"
elif isinstance(default, str):
default_str = default.lower()
else:
logger = logging.getLogger(__name__)
logger.warning(
f"The `default` parameter must be a boolean or string, got {type(default).__name__}. "
f"Falling back to `False`."
)
default_str = "false"

# Retrieve and normalize the environment variable value
value = os.getenv(name, default_str)
if value.lower() in true_values:
return True
elif value.lower() in false_values:
Expand All @@ -34,9 +89,9 @@ def get_bool_env(name: str, default: bool = False) -> bool:
logger = logging.getLogger(__name__)
logger.warning(
f"Environment variable '{name}' has unrecognized value '{value}'. "
f"Expected one of {true_values + false_values}. Using default: {default}"
f"Expected one of {true_values + false_values}. Using default: {default_str}"
)
return default
return default_str in true_values


def bbox2polygon(b0: float, b1: float, b2: float, b3: float) -> List[List[List[float]]]:
Expand Down
26 changes: 24 additions & 2 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import logging
import os
import ssl
from typing import Any, Dict, Set
from typing import Any, Dict, Set, Union

import certifi
from elasticsearch._async.client import AsyncElasticsearch

from elasticsearch import Elasticsearch # type: ignore[attr-defined]
from stac_fastapi.core.base_settings import ApiBaseSettings
from stac_fastapi.core.utilities import get_bool_env
from stac_fastapi.core.utilities import get_bool_env, validate_refresh
from stac_fastapi.types.config import ApiSettings


Expand Down Expand Up @@ -88,6 +88,17 @@ class ElasticsearchSettings(ApiSettings, ApiBaseSettings):
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)

@property
def database_refresh(self) -> Union[bool, str]:
"""
Get the value of the DATABASE_REFRESH environment variable.

Returns:
Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for".
"""
value = os.getenv("DATABASE_REFRESH", "false")
return validate_refresh(value)

@property
def create_client(self):
"""Create es client."""
Expand All @@ -109,6 +120,17 @@ class AsyncElasticsearchSettings(ApiSettings, ApiBaseSettings):
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)

@property
def database_refresh(self) -> Union[bool, str]:
"""
Get the value of the DATABASE_REFRESH environment variable.

Returns:
Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for".
"""
value = os.getenv("DATABASE_REFRESH", "false")
return validate_refresh(value)

@property
def create_client(self):
"""Create async elasticsearch client."""
Expand Down
Loading