Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions cq/_core/related_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import anyio
import injection
from anyio.abc import TaskGroup

from cq._core.message import Event, EventBus
from cq._core.scope import CQScope
Expand All @@ -20,20 +21,23 @@ def add(self, *events: Event) -> None:


@dataclass(repr=False, eq=False, frozen=True, slots=True)
class SimpleRelatedEvents(RelatedEvents):
items: list[Event] = field(default_factory=list)
class AnyIORelatedEvents(RelatedEvents):
event_bus: EventBus
task_group: TaskGroup
history: list[Event] = field(default_factory=list, init=False)

def __bool__(self) -> bool:
return bool(self.items)
def __bool__(self) -> bool: # pragma: no cover
return bool(self.history)

def add(self, *events: Event) -> None:
self.items.extend(events)
self.history.extend(events)
dispatch_method = self.event_bus.dispatch

for event in events:
self.task_group.start_soon(dispatch_method, event)


@injection.scoped(CQScope.TRANSACTION, mode="fallback")
async def related_events_recipe(event_bus: EventBus) -> AsyncIterator[RelatedEvents]:
yield (instance := SimpleRelatedEvents())

async with anyio.create_task_group() as task_group:
for event in instance.items:
task_group.start_soon(event_bus.dispatch, event)
yield AnyIORelatedEvents(event_bus, task_group)
Loading
Loading