Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ dependencies = [

[project.optional-dependencies]
dev = [
"pre-commit>=4.1.0",
"pytest~=8.2.2",
"requests>=2.32.3",
"pre-commit~=4.1.0",
]
server = [
"uvicorn~=0.30.1",
Expand All @@ -36,6 +34,8 @@ lambda = [
]
test = [
"pytest~=8.2.2",
"pytest-asyncio~=1.0.0",
"requests~=2.32.3",
]
iac = [
"aws-cdk-lib~=2.189.0",
Expand Down
80 changes: 63 additions & 17 deletions src/stac_fastapi/indexed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from json import loads
from logging import Logger, getLogger
from re import IGNORECASE, match, search
from typing import Final, List, Optional, cast
from typing import Any, Dict, Final, List, Optional, cast
from urllib.parse import unquote_plus

import attr
Expand All @@ -14,6 +14,7 @@
from stac_fastapi.types.search import BaseSearchPostRequest
from stac_fastapi.types.stac import Collection, Collections, Item, ItemCollection
from stac_index.indexer.stac_parser import StacParser
from stac_index.io.readers.exceptions import UriNotFoundException
from stac_pydantic.shared import BBox

from stac_fastapi.indexed.constants import rel_parent, rel_root, rel_self
Expand Down Expand Up @@ -57,11 +58,28 @@ async def get_collection(
[collection_id],
)
if row is not None:
return fix_collection_links(
Collection(**await fetch_dict(row[0])),
request,
try:
return fix_collection_links(
Collection(**await fetch_dict(row[0])),
request,
)
except UriNotFoundException as e:
_logger.warning(
"Collection {collection_id} exists in the index but does not exist in the data store, index is outdated".format(
collection_id=collection_id
)
)
raise NotFoundError(
"Collection {collection_id} not found in the indexed data store at {uri}. This means the index is outdated, and suggests this collection has been removed by the data store and may disappear at the next index update.".format(
collection_id=collection_id,
uri=e.uri,
)
)
raise NotFoundError(
"Collection {collection_id} does not exist.".format(
collection_id=collection_id
)
raise NotFoundError(f"Collection {collection_id} does not exist.")
)

async def item_collection(
self,
Expand Down Expand Up @@ -106,16 +124,31 @@ async def get_item(
[collection_id, item_id],
)
if row is not None:
return fix_item_links(
Item(
StacParser(row[1].split(",")).parse_stac_item(
await fetch_dict(row[0])
)[1]
),
request,
)
try:
return fix_item_links(
Item(
StacParser(row[1].split(",")).parse_stac_item(
await fetch_dict(row[0])
)[1]
),
request,
)
except UriNotFoundException as e:
_logger.warning(
"Item {collection_id}/{item_id} exists in the index but does not exist in the data store, index is outdated".format(
collection_id=collection_id, item_id=item_id
)
)
raise NotFoundError(
"Item {item_id} not found in the indexed data store at {uri}. This means the index is outdated, and suggests this item has been removed by the data store and may disappear at the next index update.".format(
item_id=item_id,
uri=e.uri,
)
)
raise NotFoundError(
f"Item {item_id} in Collection {collection_id} does not exist."
"Item {item_id} in Collection {collection_id} does not exist.".format(
item_id=item_id, collection_id=collection_id
)
)

async def post_search(
Expand Down Expand Up @@ -210,9 +243,20 @@ async def _get_minimal_collections_response(self) -> Collections:
)

async def _get_full_collections_response(self, request: Request) -> Collections:
async def get_each_collection(uri: str) -> Optional[Dict[str, Any]]:
try:
return await fetch_dict(uri=uri)
except UriNotFoundException:
_logger.warning(
"Collection '{uri}' exists in the index but does not exist in the data store, index is outdated".format(
uri=uri
)
)
return None

fetch_tasks = [
fetch_dict(url)
for url in [
get_each_collection(uri)
for uri in [
row[0]
for row in await fetchall(
f"SELECT stac_location FROM {format_query_object_name('collections')} ORDER BY id"
Expand All @@ -224,7 +268,9 @@ async def _get_full_collections_response(self, request: Request) -> Collections:
Collection(**collection_dict),
request,
)
for collection_dict in await gather(*fetch_tasks)
for collection_dict in [
entry for entry in await gather(*fetch_tasks) if entry is not None
]
]
return Collections(
collections=collections,
Expand Down
30 changes: 26 additions & 4 deletions src/stac_fastapi/indexed/search/search_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from stac_fastapi.types.search import BaseSearchPostRequest
from stac_fastapi.types.stac import Item, ItemCollection
from stac_index.indexer.stac_parser import StacParser
from stac_index.io.readers.exceptions import UriNotFoundException
from stac_pydantic.api.extensions.sort import SortDirections, SortExtension

from stac_fastapi.indexed.constants import collection_wildcard, rel_root, rel_self
Expand Down Expand Up @@ -108,15 +109,36 @@ async def search(self) -> ItemCollection:
)
has_next_page = len(rows) > query_info.limit
has_previous_page = query_info.offset is not None

async def get_each_item(uri: str) -> Optional[Dict[str, Any]]:
try:
return await fetch_dict(uri=uri)
except UriNotFoundException:
_logger.warning(
"Item '{uri}' exists in the index but does not exist in the data store, index is outdated".format(
uri=uri
)
)
return None

fetch_tasks = [
fetch_dict(url) for url in [row[0] for row in rows[0 : query_info.limit]]
get_each_item(url) for url in [row[0] for row in rows[0 : query_info.limit]]
]
fetched_dicts = []
missing_entry_indices = []
for i, entry in enumerate(await gather(*fetch_tasks)):
if entry is None:
missing_entry_indices.append(i)
else:
fetched_dicts.append(entry)
fixes_to_apply = [
fix_list.split(",")
for fix_list in [row[1] for row in rows[0 : query_info.limit]]
for fix_list in [
row[1]
for i, row in enumerate(rows[0 : query_info.limit])
if i not in missing_entry_indices
]
]
fetched_dicts = await gather(*fetch_tasks)

items = [
fix_item_links(
Item(**StacParser(fixers).parse_stac_item(item_dict)[1]),
Expand Down
18 changes: 18 additions & 0 deletions tests/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import sys
from uuid import uuid4

from pytest import MonkeyPatch


def monkeypatch_settings(monkeypatch: MonkeyPatch, **kwargs):
for module_name in ("stac_fastapi.indexed.settings",):
if module_name in sys.modules:
del sys.modules[module_name]
default_settings = {
"stac_api_indexed_token_jwt_secret": uuid4().hex,
}
for key, value in {**default_settings, **kwargs}.items():
if value is None:
monkeypatch.delenv(key)
else:
monkeypatch.setenv(key, value)
Loading