Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions src/logicblocks/event/store/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
_default_logger = structlog.get_logger("logicblocks.event.store")


class EventStream(EventSource[StreamIdentifier, StoredEvent]):
class EventStream[Category: str = str](
EventSource[StreamIdentifier[Category], StoredEvent]
):
"""A class for interacting with a specific stream of events.

Events can be published into the stream using the `publish` method, and
Expand All @@ -36,7 +38,7 @@ class EventStream(EventSource[StreamIdentifier, StoredEvent]):
def __init__(
self,
adapter: EventStorageAdapter,
stream: StreamIdentifier,
stream: StreamIdentifier[Category],
logger: FilteringBoundLogger = _default_logger,
):
self._adapter = adapter
Expand All @@ -46,7 +48,7 @@ def __init__(
self._identifier = stream

@property
def identifier(self) -> StreamIdentifier:
def identifier(self) -> StreamIdentifier[Category]:
return self._identifier

async def latest(self) -> StoredEvent | None:
Expand Down Expand Up @@ -129,11 +131,13 @@ def __eq__(self, other: Any) -> bool:
return NotImplemented
return (
self._adapter == other._adapter
and self._identifier == other._identifier
and self._identifier == other._identifier # pyright: ignore [reportUnknownMemberType]
)


class EventCategory(EventSource[CategoryIdentifier, StoredEvent]):
class EventCategory[Category: str = str](
EventSource[CategoryIdentifier[Category], StoredEvent]
):
"""A class for interacting with a specific category of events.

Since a category consists of zero or more streams, the category
Expand All @@ -146,22 +150,22 @@ class EventCategory(EventSource[CategoryIdentifier, StoredEvent]):
def __init__(
self,
adapter: EventStorageAdapter,
category: CategoryIdentifier,
category: CategoryIdentifier[Category],
logger: FilteringBoundLogger = _default_logger,
):
self._adapter = adapter
self._logger = logger.bind(category=category.category)
self._identifier = category

@property
def identifier(self) -> CategoryIdentifier:
def identifier(self) -> CategoryIdentifier[Category]:
return self._identifier

async def latest(self) -> StoredEvent | None:
await self._logger.adebug("event.category.reading-latest")
return await self._adapter.latest(target=self._identifier)

def stream(self, *, stream: str) -> EventStream:
def stream(self, *, stream: str) -> EventStream[Category]:
"""Get a stream of events in the category.

Args:
Expand Down Expand Up @@ -213,7 +217,7 @@ def __eq__(self, other: Any) -> bool:
return NotImplemented
return (
self._adapter == other._adapter
and self._identifier == other._identifier
and self._identifier == other._identifier # pyright: ignore [reportUnknownMemberType]
)


Expand Down Expand Up @@ -296,7 +300,9 @@ def __init__(
self._adapter = adapter
self._logger = logger

def stream(self, *, category: str, stream: str) -> EventStream:
def stream[Category: str](
self, *, category: Category, stream: str
) -> EventStream[Category]:
"""Get a stream of events from the store.

This method alone doesn't result in any IO, it instead returns a scoped
Expand All @@ -320,7 +326,9 @@ def stream(self, *, category: str, stream: str) -> EventStream:
stream=StreamIdentifier(category=category, stream=stream),
)

def category(self, *, category: str) -> EventCategory:
def category[Category: str = str](
self, *, category: Category
) -> EventCategory[Category]:
"""Get a category of events from the store.

This method alone doesn't result in any IO, it instead returns a scoped
Expand Down
12 changes: 7 additions & 5 deletions src/logicblocks/event/types/identifier.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from abc import ABC, abstractmethod
from collections.abc import Callable
from dataclasses import dataclass
from typing import Literal, TypedDict
from typing import Generic, Literal, TypedDict, TypeVar

from . import default_serialisation_fallback
from .json import JsonValue, JsonValueSerialisable

Category = TypeVar("Category", default=str, bound=str, covariant=True)


class Identifier(ABC, JsonValueSerialisable):
@abstractmethod
Expand Down Expand Up @@ -81,10 +83,10 @@ def __repr__(self) -> str:


@dataclass(frozen=True)
class CategoryIdentifier(EventSourceIdentifier):
class CategoryIdentifier(EventSourceIdentifier, Generic[Category]):
__hash__ = Identifier.__hash__

category: str
category: Category

def serialise(
self,
Expand Down Expand Up @@ -124,10 +126,10 @@ def __repr__(self) -> str:


@dataclass(frozen=True)
class StreamIdentifier(EventSourceIdentifier):
class StreamIdentifier(EventSourceIdentifier, Generic[Category]):
__hash__ = Identifier.__hash__

category: str
category: Category
stream: str

def serialise(
Expand Down