[Feature] Add event propagation into event handlers#58
Conversation
📝 WalkthroughWalkthroughAdds follow‑up event propagation across emitters, dispatchers, and processors (BFS sequential and semaphore‑bounded parallel), expands notification/serialization APIs, ensures single LoggingMiddleware and DI container wrapping in bootstraps, updates project metadata/CI matrix, and adds extensive tests and benchmarks. Changes
Sequence DiagramsequenceDiagram
participant Client as "Client"
participant Mediator as "Mediator"
participant Processor as "EventProcessor"
participant Emitter as "EventEmitter"
participant Dispatcher as "EventDispatcher"
participant Handler as "EventHandler"
Client->>Mediator: send(Request/Command)
Mediator->>Mediator: handle -> produce DomainEvent
Mediator->>Processor: emit_events(Event)
Processor->>Emitter: emit(Event)
Emitter->>Dispatcher: dispatch(Event)
Dispatcher->>Handler: handle(Event)
Handler-->>Dispatcher: follow-up Event(s)
Dispatcher->>Emitter: dispatch follow-up(s)
Emitter-->>Processor: return follow-up sequence
alt Parallel mode (semaphore)
Processor->>Processor: schedule follow-ups as tasks (bounded semaphore)
Processor->>Processor: FIRST_COMPLETED enqueues follow-ups from finished tasks
else Sequential (BFS)
Processor->>Processor: enqueue follow-ups (BFS) and process in order
end
Processor-->>Client: complete
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #58 +/- ##
==========================================
+ Coverage 83.74% 87.36% +3.62%
==========================================
Files 70 70
Lines 2436 2478 +42
==========================================
+ Hits 2040 2165 +125
+ Misses 396 313 -83 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/cqrs/events/event.py (1)
252-315:⚠️ Potential issue | 🟠 MajorMake
from_protoa classmethod; instance method is effectively unusable as a constructor.The interface and implementations define
from_protoas an instance method, but semantically it should construct an event from a proto representation. As an instance method, it requires instantiating an invalid/empty event first just to call the method, defeating its purpose. It should be a@classmethodso it can be called asEventClass.from_proto(proto).🛠️ Suggested fix
if typing.TYPE_CHECKING: @@ - def from_proto(self, proto: typing.Any) -> Self: ... + `@classmethod` + def from_proto(cls, proto: typing.Any) -> Self: ... @@ - def from_proto(self, proto: typing.Any) -> Self: + `@classmethod` + def from_proto(cls, proto: typing.Any) -> Self: """ Constructs event from proto event object @@ - def from_proto(self, proto: typing.Any) -> Self: + `@classmethod` + def from_proto(cls, proto: typing.Any) -> Self: """ Constructs event from proto event objectAlso applies to: 370-378
src/cqrs/events/map.py (1)
51-52:⚠️ Potential issue | 🟡 MinorMinor: Typo in error message.
"already bind" should be "already bound".
📝 Fix typo
if handler_type in self[event_type]: - raise KeyError(f"{handler_type} already bind to {event_type}") + raise KeyError(f"{handler_type} already bound to {event_type}")
🤖 Fix all issues with AI agents
In `@pyproject.toml`:
- Around line 65-66: Update the protobuf dependency declaration in
pyproject.toml to a fixed version (bump "protobuf==4.25.5" to "protobuf>=4.25.8"
or a newer safe release) to address CVE-2025-4565 and end-of-support; ensure the
new constraint replaces the exact "protobuf==4.25.5" entry so dependency
resolution pulls the patched release while keeping compatibility with Python
3.10–3.12.
In `@src/cqrs/events/event_processor.py`:
- Around line 91-107: emit_events currently creates background tasks with
asyncio.create_task in a fire-and-forget way (via _emit_event_with_semaphore),
causing emit_events to return before work completes and swallow exceptions;
change emit_events to track all created Task objects (e.g., collect tasks in a
local list when calling asyncio.create_task for each initial event) and await
their completion with await asyncio.gather(*tasks, return_exceptions=False) so
callers only return when all work (including recursively scheduled follow-ups
from _emit_event_with_semaphore) is finished; modify _emit_event_with_semaphore
to accept a task-collection callback or append newly created follow-up tasks to
a shared list (so follow-ups are also awaited), keep using _event_semaphore and
_event_emitter as before, and update the method docstring to remove the line
about handler tasks possibly running after return.
In `@tests/benchmarks/dataclasses/test_benchmark_event_handler_chain.py`:
- Around line 64-85: The resolve method in _ChainContainer currently returns
shared instances (_h1, _h2, _h3) which lets stateful handlers (_HandlerL1,
_HandlerL2) race under parallel benchmarks; modify _ChainContainer.resolve to
construct and return fresh handler instances each call (e.g. return new
_HandlerL1()/ _HandlerL2()/ _HandlerL3() instead of self._h1/_h2/_h3) or
otherwise provide task-local handler objects so per-resolve state like
_follow_ups cannot be overwritten by concurrent executions.
🧹 Nitpick comments (6)
src/cqrs/events/bootstrap.py (1)
143-149: Avoid duplicatingLoggingMiddlewarewhen already provided.Appending unconditionally can lead to double logging. Consider appending only if it’s not already in the list.
♻️ Suggested refactor
- middlewares_list: typing.List[mediator_middlewares.Middleware] = list( - middlewares or [], - ) - return setup_mediator( - container, - events_mapper=events_mapper, - middlewares=middlewares_list + [logging_middleware.LoggingMiddleware()], - ) + middlewares_list: typing.List[mediator_middlewares.Middleware] = list( + middlewares or [], + ) + if not any( + isinstance(m, logging_middleware.LoggingMiddleware) + for m in middlewares_list + ): + middlewares_list.append(logging_middleware.LoggingMiddleware()) + return setup_mediator( + container, + events_mapper=events_mapper, + middlewares=middlewares_list, + )src/cqrs/events/event_processor.py (1)
83-89: Use a deque for BFS to avoid O(n²) queue pops.
list.pop(0)shifts the list each time. For long follow-up chains, a deque withpopleft()keeps this O(1).+from collections import deque @@ - to_process: list[IEvent] = list(events) + to_process: deque[IEvent] = deque(events) while to_process: - event = to_process.pop(0) + event = to_process.popleft() follow_ups = await self._event_emitter.emit(event) to_process.extend(follow_ups)tests/benchmarks/dataclasses/test_benchmark_event_handler_chain.py (1)
117-121: Remove hard-coded sleep in parallel mode benchmark; await task completion instead.In parallel mode,
emit_eventsreturns immediately after scheduling tasks viaasyncio.create_task—handler tasks may still run after return. The 0.5s sleep compensates but adds non-deterministic overhead and risks insufficient timeout under load. Ifemit_eventsis updated to fully await pending tasks in parallel mode, the sleep becomes unnecessary and benchmarks will be more deterministic.Two benchmark files use this pattern (both line 119):
tests/benchmarks/default/test_benchmark_event_handler_chain.pyandtests/benchmarks/dataclasses/test_benchmark_event_handler_chain.py.♻️ Suggested refactor (after `emit_events` awaits completion in parallel mode)
async def run() -> None: await processor.emit_events([_EventL1(id_="root")]) - await asyncio.sleep(0.5)src/cqrs/events/event_handler.py (1)
1-2: Minor: Import ordering.The
collections.abcimport appears beforeabc, but standard practice is alphabetical ordering within stdlib imports (abcbeforecollections.abc).🔧 Suggested reorder
-from collections.abc import Sequence import abc +from collections.abc import Sequence import typingtests/unit/test_event_processor.py (1)
303-357: Consider potential flakiness from timing-based assertion.The test relies on
await asyncio.sleep(0.15)to wait for parallel tasks. While the sleep duration appears sufficient (3 child events × 0.02s each under semaphore=2), timing-based waits can cause flaky tests under CI load.Consider using
asyncio.gatheron tracked tasks or an event/counter to deterministically wait for completion, though the current approach is acceptable for a unit test if flakiness hasn't been observed.tests/unit/test_event_dispatcher_follow_ups.py (1)
20-31: Handler accumulates follow-ups across invocations.
_HandlerL1._follow_upsis never cleared, so if the same handler instance handles multiple events, follow-ups will accumulate. This is fine for this single-dispatch test, but consider clearing at the start ofhandle()if the handler might be reused, matching the pattern used intest_event_processor.py(e.g.,_FanHandlerat line 325).🔧 Optional: Clear follow-ups at start of handle
async def handle(self, event: _EventL1) -> None: + self._follow_ups = [] self.processed.append(event) self._follow_ups.append(_EventL2(name="L2_from_" + event.name))
Release Notes
New Features
Improvements
Chores
Summary by CodeRabbit
New Features
Improvements
Chores
Tests