Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/stac_fastapi/indexed/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class _Settings(ApiSettings):
True # container images set this to false after installing extensions in build
)
create_empty_index_if_missing: bool = False
max_concurrency: int = 10


@lru_cache(maxsize=1)
Expand Down
26 changes: 12 additions & 14 deletions src/stac_fastapi/indexed/stac/fetcher.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
from typing import Any, Dict, Type
from asyncio import Semaphore
from typing import Any, Dict, Final

from stac_index.io.readers import SourceReader, get_reader_for_uri
from stac_pydantic.collection import Collection
from stac_pydantic.item import Item
from stac_index.io.readers import get_reader_for_uri

_source_reader_instances: Dict[str, Type[SourceReader]] = {}
from stac_fastapi.indexed.settings import get_settings

# If we ever support multi-catalog indexes, or if we see catalogs
# whose STAC items come from different domains, it might be worth
# maintaining one semaphore per domain as a single semaphore may
# unnecessarily limit concurrency when hosts could handle more.
_semaphore: Final[Semaphore] = Semaphore(value=get_settings().max_concurrency)

async def fetch_dict(uri: str) -> Dict[str, Any]:
return await get_reader_for_uri(uri=uri).load_json_from_uri(uri)


async def fetch_item(uri: str) -> Item:
return Item(**(await fetch_dict(uri)))


async def fetch_collection(uri: str) -> Collection:
return Collection(**(await fetch_dict(uri)))
async def fetch_dict(uri: str) -> Dict[str, Any]:
async with _semaphore:
return await get_reader_for_uri(uri=uri).load_json_from_uri(uri)