Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

- `STAC_INDEX_ASSETS` environment variable to allow asset serialization to be configurable. [#433](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/433)
- Added the `ENV_MAX_LIMIT` environment variable to SFEOS, allowing overriding of the `MAX_LIMIT`, which controls the `?limit` parameter for returned items and STAC collections. [#434](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/434)
- Updated the `format_datetime_range` function to support milliseconds. [#423](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/423)

Expand Down
58 changes: 38 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,26 +66,43 @@ This project is built on the following technologies: STAC, stac-fastapi, FastAPI

## Table of Contents

- [Documentation & Resources](#documentation--resources)
- [Package Structure](#package-structure)
- [Examples](#examples)
- [Performance](#performance)
- [Quick Start](#quick-start)
- [Installation](#installation)
- [Running Locally](#running-locally)
- [Configuration reference](#configuration-reference)
- [Interacting with the API](#interacting-with-the-api)
- [Configure the API](#configure-the-api)
- [Collection pagination](#collection-pagination)
- [Ingesting Sample Data CLI Tool](#ingesting-sample-data-cli-tool)
- [Elasticsearch Mappings](#elasticsearch-mappings)
- [Managing Elasticsearch Indices](#managing-elasticsearch-indices)
- [Snapshots](#snapshots)
- [Reindexing](#reindexing)
- [Auth](#auth)
- [Aggregation](#aggregation)
- [Rate Limiting](#rate-limiting)
- [Datetime-Based Index Management](#datetime-based-index-management)
- [stac-fastapi-elasticsearch-opensearch](#stac-fastapi-elasticsearch-opensearch)
- [Sponsors \& Supporters](#sponsors--supporters)
- [Project Introduction - What is SFEOS?](#project-introduction---what-is-sfeos)
- [Common Deployment Patterns](#common-deployment-patterns)
- [Technologies](#technologies)
- [Table of Contents](#table-of-contents)
- [Documentation \& Resources](#documentation--resources)
- [Package Structure](#package-structure)
- [Examples](#examples)
- [Performance](#performance)
- [Direct Response Mode](#direct-response-mode)
- [Quick Start](#quick-start)
- [Installation](#installation)
- [Running Locally](#running-locally)
- [Using Pre-built Docker Images](#using-pre-built-docker-images)
- [Using Docker Compose](#using-docker-compose)
- [Configuration Reference](#configuration-reference)
- [Datetime-Based Index Management](#datetime-based-index-management)
- [Overview](#overview)
- [When to Use](#when-to-use)
- [Configuration](#configuration)
- [Enabling Datetime-Based Indexing](#enabling-datetime-based-indexing)
- [Related Configuration Variables](#related-configuration-variables)
- [How Datetime-Based Indexing Works](#how-datetime-based-indexing-works)
- [Index and Alias Naming Convention](#index-and-alias-naming-convention)
- [Index Size Management](#index-size-management)
- [Interacting with the API](#interacting-with-the-api)
- [Configure the API](#configure-the-api)
- [Collection Pagination](#collection-pagination)
- [Ingesting Sample Data CLI Tool](#ingesting-sample-data-cli-tool)
- [Elasticsearch Mappings](#elasticsearch-mappings)
- [Managing Elasticsearch Indices](#managing-elasticsearch-indices)
- [Snapshots](#snapshots)
- [Reindexing](#reindexing)
- [Auth](#auth)
- [Aggregation](#aggregation)
- [Rate Limiting](#rate-limiting)

## Documentation & Resources

Expand Down Expand Up @@ -228,6 +245,7 @@ You can customize additional settings in your `.env` file:
| `DATABASE_REFRESH` | Controls whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. | `false` | Optional |
| `ENABLE_TRANSACTIONS_EXTENSIONS` | Enables or disables the Transactions and Bulk Transactions API extensions. If set to `false`, the POST `/collections` route and related transaction endpoints (including bulk transaction operations) will be unavailable in the API. This is useful for deployments where mutating the catalog via the API should be prevented. | `true` | Optional |
| `STAC_ITEM_LIMIT` | Sets the environment variable for result limiting to SFEOS for the number of returned items and STAC collections. | `10` | Optional |
| `STAC_INDEX_ASSETS` | Controls if Assets are indexed when added to Elasticsearch/Opensearch. This allows asset fields to be included in search queries. | `false` | Optional |
| `ENV_MAX_LIMIT` | Configures the environment variable in SFEOS to override the default `MAX_LIMIT`, which controls the limit parameter for returned items and STAC collections. | `10,000` | Optional |

> [!NOTE]
Expand Down
84 changes: 84 additions & 0 deletions scripts/reindex_elasticsearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import asyncio
import time

from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings
from stac_fastapi.elasticsearch.database_logic import create_index_templates
from stac_fastapi.sfeos_helpers.mappings import COLLECTIONS_INDEX, ITEMS_INDEX_PREFIX


async def reindex(client, index, new_index, aliases):
"""Reindex STAC index"""
print(f"reindexing {index} to {new_index}")

await client.options(ignore_status=400).indices.create(index=new_index)

reindex_resp = await client.reindex(
dest={"index": new_index},
source={"index": [index]},
wait_for_completion=False,
script={
"source": "if (ctx._source.containsKey('assets')){List l = new ArrayList();for (key in ctx._source.assets.keySet()) {def item = ctx._source.assets[key]; item['es_key'] = key; l.add(item)}ctx._source.assets=l} if (ctx._source.containsKey('item_assets')){ List a = new ArrayList(); for (key in ctx._source.item_assets.keySet()) {def item = ctx._source.item_assets[key]; item['es_key'] = key; a.add(item)}ctx._source.item_assets=a}",
"lang": "painless",
},
)

task_id = reindex_resp["task"]

reindex_complete = False
while not reindex_complete:
task_resp = await client.tasks.get(task_id=task_id)

if "completed" in task_resp and task_resp["completed"]:
reindex_complete = True

elif "error" in task_resp:
reindex_complete = True
print(f"Reindex failed for {index} with error: {task_resp['error']}")

else:
time.sleep(60)

actions = []
for alias in aliases["aliases"]:
actions.extend(
[
{"add": {"index": new_index, "alias": alias}},
{"remove": {"index": index, "alias": alias}},
]
)

await client.indices.update_aliases(actions=actions)


async def run():
"""Reindex all STAC indexes for mapping update"""
client = AsyncElasticsearchSettings().create_client

await create_index_templates()

collection_response = await client.indices.get_alias(name=COLLECTIONS_INDEX)
collections = await client.search(index=COLLECTIONS_INDEX)

collection_index, collection_aliases = next(iter(collection_response.items()))
collection_index_name, version = collection_index.rsplit("-", 1)
new_collection_index = f"{collection_index_name}-{str(int(version) + 1).zfill(6)}"

await reindex(client, collection_index, new_collection_index, collection_aliases)

for collection in collections["hits"]["hits"]:

item_indexes = await client.indices.get_alias(
name=f"{ITEMS_INDEX_PREFIX}{collection['_id']}*"
)

for item_index, aliases in item_indexes.items():
item_index_name, version = item_index.rsplit("-", 1)
new_item_index = f"{item_index_name}-{str(int(version) + 1).zfill(6)}"

await reindex(client, item_index, new_item_index, aliases)

await client.close()


if __name__ == "__main__":
asyncio.run(run())
84 changes: 84 additions & 0 deletions scripts/reindex_opensearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import asyncio
import time

from stac_fastapi.opensearch.config import AsyncOpensearchSettings
from stac_fastapi.opensearch.database_logic import create_index_templates
from stac_fastapi.sfeos_helpers.mappings import COLLECTIONS_INDEX, ITEMS_INDEX_PREFIX


async def reindex(client, index, new_index, aliases):
"""Reindex STAC index"""
print(f"reindexing {index} to {new_index}")

await client.options(ignore_status=400).indices.create(index=new_index)

reindex_resp = await client.reindex(
dest={"index": new_index},
source={"index": [index]},
wait_for_completion=False,
script={
"source": "if (ctx._source.containsKey('assets')){List l = new ArrayList();for (key in ctx._source.assets.keySet()) {def item = ctx._source.assets[key]; item['es_key'] = key; l.add(item)}ctx._source.assets=l} if (ctx._source.containsKey('item_assets')){ List a = new ArrayList(); for (key in ctx._source.item_assets.keySet()) {def item = ctx._source.item_assets[key]; item['es_key'] = key; a.add(item)}ctx._source.item_assets=a}",
"lang": "painless",
},
)

task_id = reindex_resp["task"]

reindex_complete = False
while not reindex_complete:
task_resp = await client.tasks.get(task_id=task_id)

if "completed" in task_resp and task_resp["completed"]:
reindex_complete = True

elif "error" in task_resp:
reindex_complete = True
print(f"Reindex failed for {index} with error: {task_resp['error']}")

else:
time.sleep(60)

actions = []
for alias in aliases["aliases"]:
actions.extend(
[
{"add": {"index": new_index, "alias": alias}},
{"remove": {"index": index, "alias": alias}},
]
)

await client.indices.update_aliases(actions=actions)


async def run():
"""Reindex all STAC indexes for mapping update"""
client = AsyncOpensearchSettings().create_client

await create_index_templates()

collection_response = await client.indices.get_alias(name=COLLECTIONS_INDEX)
collections = await client.search(index=COLLECTIONS_INDEX)

collection_index, collection_aliases = next(iter(collection_response.items()))
collection_index_name, version = collection_index.rsplit("-", 1)
new_collection_index = f"{collection_index_name}-{str(int(version) + 1).zfill(6)}"

await reindex(client, collection_index, new_collection_index, collection_aliases)

for collection in collections["hits"]["hits"]:

item_indexes = await client.indices.get_alias(
name=f"{ITEMS_INDEX_PREFIX}{collection['_id']}*"
)

for item_index, aliases in item_indexes.items():
item_index_name, version = item_index.rsplit("-", 1)
new_item_index = f"{item_index_name}-{str(int(version) + 1).zfill(6)}"

await reindex(client, item_index, new_item_index, aliases)

await client.close()


if __name__ == "__main__":
asyncio.run(run())
43 changes: 33 additions & 10 deletions stac_fastapi/core/stac_fastapi/core/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from stac_fastapi.core.datetime_utils import now_to_rfc3339_str
from stac_fastapi.core.models.links import CollectionLinks
from stac_fastapi.core.utilities import get_bool_env
from stac_fastapi.types import stac as stac_types
from stac_fastapi.types.links import ItemLinks, resolve_links

Expand Down Expand Up @@ -66,9 +67,10 @@ def stac_to_db(cls, stac_data: stac_types.Item, base_url: str) -> stac_types.Ite
item_links = resolve_links(stac_data.get("links", []), base_url)
stac_data["links"] = item_links

stac_data["assets"] = [
{"es_key": k, **v} for k, v in stac_data.get("assets", {}).items()
]
if get_bool_env("STAC_INDEX_ASSETS"):
stac_data["assets"] = [
{"es_key": k, **v} for k, v in stac_data.get("assets", {}).items()
]

now = now_to_rfc3339_str()
if "created" not in stac_data["properties"]:
Expand Down Expand Up @@ -97,6 +99,12 @@ def db_to_stac(cls, item: dict, base_url: str) -> stac_types.Item:
if original_links:
item_links += resolve_links(original_links, base_url)

if get_bool_env("STAC_INDEX_ASSETS"):
assets = {a.pop("es_key"): a for a in item.get("assets", [])}

else:
assets = item.get("assets", {})

return stac_types.Item(
type="Feature",
stac_version=item.get("stac_version", ""),
Expand All @@ -107,7 +115,7 @@ def db_to_stac(cls, item: dict, base_url: str) -> stac_types.Item:
bbox=item.get("bbox", []),
properties=item.get("properties", {}),
links=item_links,
assets={a.pop("es_key"): a for a in item.get("assets", [])},
assets=assets,
)


Expand All @@ -132,9 +140,15 @@ def stac_to_db(
collection["links"] = resolve_links(
collection.get("links", []), str(request.base_url)
)
collection["assets"] = [
{"es_key": k, **v} for k, v in collection.get("assets", {}).items()
]

if get_bool_env("STAC_INDEX_ASSETS"):
collection["assets"] = [
{"es_key": k, **v} for k, v in collection.get("assets", {}).items()
]
collection["item_assets"] = [
{"es_key": k, **v} for k, v in collection.get("item_assets", {}).items()
]

return collection

@classmethod
Expand Down Expand Up @@ -181,9 +195,18 @@ def db_to_stac(
collection_links += resolve_links(original_links, str(request.base_url))
collection["links"] = collection_links

collection["assets"] = {
a.pop("es_key"): a for a in collection.get("assets", [])
}
if get_bool_env("STAC_INDEX_ASSETS"):
collection["assets"] = {
a.pop("es_key"): a for a in collection.get("assets", [])
}
collection["item_assets"] = {
i.pop("es_key"): i for i in collection.get("item_assets", [])
}

else:
collection["assets"] = collection.get("assets", {})
if item_assets := collection.get("item_assets"):
collection["item_assets"] = item_assets

# Return the stac_types.Collection object
return stac_types.Collection(**collection)
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import os
from typing import Any, Dict, Literal, Protocol

from stac_fastapi.core.utilities import get_bool_env


# stac_pydantic classes extend _GeometryBase, which doesn't have a type field,
# So create our own Protocol for typing
Expand Down Expand Up @@ -134,7 +136,7 @@ class Geometry(Protocol): # noqa
"id": {"type": "keyword"},
"collection": {"type": "keyword"},
"geometry": {"type": "geo_shape"},
"assets": {"type": "object"},
"assets": {"type": "object", "enabled": get_bool_env("STAC_INDEX_ASSETS")},
"links": {"type": "object", "enabled": False},
"properties": {
"type": "object",
Expand Down Expand Up @@ -162,7 +164,7 @@ class Geometry(Protocol): # noqa
"extent.temporal.interval": {"type": "date"},
"providers": {"type": "object", "enabled": False},
"links": {"type": "object", "enabled": False},
"item_assets": {"type": "object", "enabled": False},
"item_assets": {"type": "object", "enabled": get_bool_env("STAC_INDEX_ASSETS")},
},
}

Expand Down