Skip to content

[Feature] Add event propagation into event handlers#58

Merged
vadikko2 merged 10 commits intomasterfrom
add-event-propagation-to-event-handler
Feb 2, 2026
Merged

[Feature] Add event propagation into event handlers#58
vadikko2 merged 10 commits intomasterfrom
add-event-propagation-to-event-handler

Conversation

@vadikko2
Copy link
Owner

@vadikko2 vadikko2 commented Feb 1, 2026

Release Notes

  • New Features

    • Added support for follow-up events in event handlers, enabling multi-level event chains with breadth-first processing
    • Introduced parallel event handling with semaphore-based concurrency control
    • Added protobuf serialization support for events
    • Auto-injected logging middleware for improved observability
    • Support for external dependency injection containers
  • Improvements

    • Expanded event module public API with additional event types and interfaces
    • Event handler properties now return sequences with default empty implementation
    • Enhanced event dispatcher with warning logs for unhandled events
    • Relaxed dependency version constraints for greater flexibility
  • Chores

    • Expanded Python test matrix to 3.10, 3.11, 3.12
    • Updated project metadata and description
    • Version bumped to 4.8.0

Summary by CodeRabbit

  • New Features

    • Multi‑level follow‑up propagation (BFS sequential + semaphore‑controlled parallel); event emission returns collected follow‑ups
    • Automatic single logging middleware and better external DI container support
  • Improvements

    • Expanded public event API and concrete default events() behavior across handlers/steps
    • Dispatcher warns on unknown events; duplicate handler bindings rejected
    • Project description/version updated; examples now reference protobuf
  • Chores

    • CI matrix broadened to Python 3.10/3.11/3.12; publish gated to 3.12
  • Tests

    • Extensive new unit, integration, and benchmark suites for multi‑level chains and follow‑ups

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 1, 2026

📝 Walkthrough

Walkthrough

Adds follow‑up event propagation across emitters, dispatchers, and processors (BFS sequential and semaphore‑bounded parallel), expands notification/serialization APIs, ensures single LoggingMiddleware and DI container wrapping in bootstraps, updates project metadata/CI matrix, and adds extensive tests and benchmarks.

Changes

Cohort / File(s) Summary
CI & Project Metadata
​.github/workflows/python-publish.yml, pyproject.toml
Expand Python test matrix to 3.10–3.12; gate publish to 3.12; bump version to 4.8.0; change description; adjust dependency constraints and move protobuf into examples optional deps.
Event dispatch & processing
src/cqrs/dispatcher/event.py, src/cqrs/events/event_processor.py
Dispatcher logs and returns early if no handlers; _handle_event explicitly returns None and dispatches handler follow‑ups; EventProcessor refactored to BFS for sequential follow-ups and FIRST_COMPLETED semaphore‑bounded scheduling for parallel mode.
Event emission & handler surface
src/cqrs/events/event_emitter.py, src/cqrs/events/event_handler.py
EventEmitter now requires event_map, emit returns a sequence of follow‑ups; domain vs notification flows separated (broker path); EventHandler exposes concrete events() returning empty sequence by default.
Event types & API exports
src/cqrs/events/event.py, src/cqrs/events/__init__.py, src/cqrs/events/map.py
Add from_proto and explicit proto typings for notification events; clarify hash semantics; expand all exports; EventMap documents/enforces non‑overwrite binding and rejects duplicate handler binds.
Bootstrap & DI
src/cqrs/events/bootstrap.py, src/cqrs/requests/bootstrap.py, src/cqrs/saga/bootstrap.py
Wrap external di.Container into internal adapter when needed; ensure a single LoggingMiddleware is appended only if missing (prevent duplicates).
Request / COR / Saga handlers
src/cqrs/requests/request_handler.py, src/cqrs/requests/cor_request_handler.py, src/cqrs/saga/step.py
Remove abstract events properties; provide concrete default events() returning empty sequence with docstrings instructing overrides.
Tests — unit / integration / benchmarks
tests/unit/*, tests/integration/test_event_handler_chain.py, tests/benchmarks/*
Add many unit and integration tests validating follow‑up propagation (EventEmitter/Dispatcher/Processor/Mediator) and multi‑level benchmarks for sequential and semaphore‑limited parallel chains.
Workflow robustness & misc files
.github/workflows/tests.yml, .gitignore, tests/.codspeed/*
Safer file‑list handling for ruff checks; add .codspeed/ to .gitignore; remove generated CodSpeed artifact file.

Sequence Diagram

sequenceDiagram
    participant Client as "Client"
    participant Mediator as "Mediator"
    participant Processor as "EventProcessor"
    participant Emitter as "EventEmitter"
    participant Dispatcher as "EventDispatcher"
    participant Handler as "EventHandler"

    Client->>Mediator: send(Request/Command)
    Mediator->>Mediator: handle -> produce DomainEvent
    Mediator->>Processor: emit_events(Event)

    Processor->>Emitter: emit(Event)
    Emitter->>Dispatcher: dispatch(Event)
    Dispatcher->>Handler: handle(Event)
    Handler-->>Dispatcher: follow-up Event(s)

    Dispatcher->>Emitter: dispatch follow-up(s)
    Emitter-->>Processor: return follow-up sequence

    alt Parallel mode (semaphore)
        Processor->>Processor: schedule follow-ups as tasks (bounded semaphore)
        Processor->>Processor: FIRST_COMPLETED enqueues follow-ups from finished tasks
    else Sequential (BFS)
        Processor->>Processor: enqueue follow-ups (BFS) and process in order
    end

    Processor-->>Client: complete
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • Add new banchmarks #52 — Adds benchmark tests and CI/benchmark-related changes that overlap with the new benchmarks and CI workflow updates in this PR.

Poem

🐇 Hoppity-hop, I trace each new cue,

Handlers birthing hops that bustle on through.
BFS or semaphores, the pipeline hums bright,
I nibble on logs and dance in the night.
New events, new trails — a rabbit’s delight.

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 19.76% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title '[Feature] Add event propagation into event handlers' directly and clearly summarizes the main objective of the PR—enabling follow-up events in event handlers for multi-level event chains.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch add-event-propagation-to-event-handler

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov
Copy link

codecov bot commented Feb 1, 2026

Codecov Report

❌ Patch coverage is 86.95652% with 9 lines in your changes missing coverage. Please review.
✅ Project coverage is 87.36%. Comparing base (245d268) to head (6363d20).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
src/cqrs/events/event_processor.py 92.59% 2 Missing ⚠️
src/cqrs/requests/request_handler.py 0.00% 2 Missing ⚠️
src/cqrs/dispatcher/event.py 66.66% 1 Missing ⚠️
src/cqrs/events/event_emitter.py 90.90% 1 Missing ⚠️
src/cqrs/events/map.py 0.00% 1 Missing ⚠️
src/cqrs/requests/cor_request_handler.py 0.00% 1 Missing ⚠️
src/cqrs/saga/step.py 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master      #58      +/-   ##
==========================================
+ Coverage   83.74%   87.36%   +3.62%     
==========================================
  Files          70       70              
  Lines        2436     2478      +42     
==========================================
+ Hits         2040     2165     +125     
+ Misses        396      313      -83     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@codspeed-hq
Copy link
Contributor

codspeed-hq bot commented Feb 1, 2026

CodSpeed Performance Report

Merging this PR will not alter performance

Comparing add-event-propagation-to-event-handler (6363d20) with master (245d268)

Summary

✅ 27 untouched benchmarks
🆕 31 new benchmarks

Performance Changes

Benchmark BASE HEAD Efficiency
🆕 test_benchmark_saga_sqlalchemy_full_transaction N/A 19 ms N/A
🆕 test_benchmark_cor_first_handler N/A 3.1 ms N/A
🆕 test_benchmark_cor_ten_requests_first_handler N/A 23.7 ms N/A
🆕 test_benchmark_saga_memory_ten_transactions N/A 8.7 ms N/A
🆕 test_benchmark_complex_nested_structure N/A 58.5 µs N/A
🆕 test_benchmark_cor_third_handler N/A 3.1 ms N/A
🆕 test_benchmark_event_chain_three_levels_sequential N/A 7.2 ms N/A
🆕 test_benchmark_response_from_dict N/A 51.5 µs N/A
🆕 test_benchmark_cor_second_handler N/A 3.1 ms N/A
🆕 test_benchmark_request_from_dict N/A 54.7 µs N/A
🆕 test_benchmark_event_chain_three_levels_parallel N/A 9.1 ms N/A
🆕 test_benchmark_saga_sqlalchemy_single_step N/A 10.8 ms N/A
🆕 test_benchmark_response_to_dict N/A 38.6 µs N/A
🆕 test_benchmark_stream_single_item N/A 1.2 ms N/A
🆕 test_benchmark_saga_memory_single_step N/A 838.2 µs N/A
🆕 test_benchmark_event_chain_three_levels_parallel N/A 9.2 ms N/A
🆕 test_benchmark_event_processing N/A 860.6 µs N/A
🆕 test_benchmark_event_chain_three_levels_sequential N/A 7 ms N/A
🆕 test_benchmark_multiple_events N/A 3.4 ms N/A
🆕 test_benchmark_request_to_dict N/A 43.2 µs N/A
... ... ... ... ...

ℹ️ Only the first 20 benchmarks are displayed. Go to the app to view all benchmarks.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/cqrs/events/event.py (1)

252-315: ⚠️ Potential issue | 🟠 Major

Make from_proto a classmethod; instance method is effectively unusable as a constructor.

The interface and implementations define from_proto as an instance method, but semantically it should construct an event from a proto representation. As an instance method, it requires instantiating an invalid/empty event first just to call the method, defeating its purpose. It should be a @classmethod so it can be called as EventClass.from_proto(proto).

🛠️ Suggested fix
     if typing.TYPE_CHECKING:
@@
-        def from_proto(self, proto: typing.Any) -> Self: ...
+        `@classmethod`
+        def from_proto(cls, proto: typing.Any) -> Self: ...
@@
-    def from_proto(self, proto: typing.Any) -> Self:
+    `@classmethod`
+    def from_proto(cls, proto: typing.Any) -> Self:
         """
         Constructs event from proto event object
@@
-    def from_proto(self, proto: typing.Any) -> Self:
+    `@classmethod`
+    def from_proto(cls, proto: typing.Any) -> Self:
         """
         Constructs event from proto event object

Also applies to: 370-378

src/cqrs/events/map.py (1)

51-52: ⚠️ Potential issue | 🟡 Minor

Minor: Typo in error message.

"already bind" should be "already bound".

📝 Fix typo
             if handler_type in self[event_type]:
-                raise KeyError(f"{handler_type} already bind to {event_type}")
+                raise KeyError(f"{handler_type} already bound to {event_type}")
🤖 Fix all issues with AI agents
In `@pyproject.toml`:
- Around line 65-66: Update the protobuf dependency declaration in
pyproject.toml to a fixed version (bump "protobuf==4.25.5" to "protobuf>=4.25.8"
or a newer safe release) to address CVE-2025-4565 and end-of-support; ensure the
new constraint replaces the exact "protobuf==4.25.5" entry so dependency
resolution pulls the patched release while keeping compatibility with Python
3.10–3.12.

In `@src/cqrs/events/event_processor.py`:
- Around line 91-107: emit_events currently creates background tasks with
asyncio.create_task in a fire-and-forget way (via _emit_event_with_semaphore),
causing emit_events to return before work completes and swallow exceptions;
change emit_events to track all created Task objects (e.g., collect tasks in a
local list when calling asyncio.create_task for each initial event) and await
their completion with await asyncio.gather(*tasks, return_exceptions=False) so
callers only return when all work (including recursively scheduled follow-ups
from _emit_event_with_semaphore) is finished; modify _emit_event_with_semaphore
to accept a task-collection callback or append newly created follow-up tasks to
a shared list (so follow-ups are also awaited), keep using _event_semaphore and
_event_emitter as before, and update the method docstring to remove the line
about handler tasks possibly running after return.

In `@tests/benchmarks/dataclasses/test_benchmark_event_handler_chain.py`:
- Around line 64-85: The resolve method in _ChainContainer currently returns
shared instances (_h1, _h2, _h3) which lets stateful handlers (_HandlerL1,
_HandlerL2) race under parallel benchmarks; modify _ChainContainer.resolve to
construct and return fresh handler instances each call (e.g. return new
_HandlerL1()/ _HandlerL2()/ _HandlerL3() instead of self._h1/_h2/_h3) or
otherwise provide task-local handler objects so per-resolve state like
_follow_ups cannot be overwritten by concurrent executions.
🧹 Nitpick comments (6)
src/cqrs/events/bootstrap.py (1)

143-149: Avoid duplicating LoggingMiddleware when already provided.

Appending unconditionally can lead to double logging. Consider appending only if it’s not already in the list.

♻️ Suggested refactor
-    middlewares_list: typing.List[mediator_middlewares.Middleware] = list(
-        middlewares or [],
-    )
-    return setup_mediator(
-        container,
-        events_mapper=events_mapper,
-        middlewares=middlewares_list + [logging_middleware.LoggingMiddleware()],
-    )
+    middlewares_list: typing.List[mediator_middlewares.Middleware] = list(
+        middlewares or [],
+    )
+    if not any(
+        isinstance(m, logging_middleware.LoggingMiddleware)
+        for m in middlewares_list
+    ):
+        middlewares_list.append(logging_middleware.LoggingMiddleware())
+    return setup_mediator(
+        container,
+        events_mapper=events_mapper,
+        middlewares=middlewares_list,
+    )
src/cqrs/events/event_processor.py (1)

83-89: Use a deque for BFS to avoid O(n²) queue pops.

list.pop(0) shifts the list each time. For long follow-up chains, a deque with popleft() keeps this O(1).

+from collections import deque
@@
-            to_process: list[IEvent] = list(events)
+            to_process: deque[IEvent] = deque(events)
             while to_process:
-                event = to_process.pop(0)
+                event = to_process.popleft()
                 follow_ups = await self._event_emitter.emit(event)
                 to_process.extend(follow_ups)
tests/benchmarks/dataclasses/test_benchmark_event_handler_chain.py (1)

117-121: Remove hard-coded sleep in parallel mode benchmark; await task completion instead.

In parallel mode, emit_events returns immediately after scheduling tasks via asyncio.create_task—handler tasks may still run after return. The 0.5s sleep compensates but adds non-deterministic overhead and risks insufficient timeout under load. If emit_events is updated to fully await pending tasks in parallel mode, the sleep becomes unnecessary and benchmarks will be more deterministic.

Two benchmark files use this pattern (both line 119): tests/benchmarks/default/test_benchmark_event_handler_chain.py and tests/benchmarks/dataclasses/test_benchmark_event_handler_chain.py.

♻️ Suggested refactor (after `emit_events` awaits completion in parallel mode)
     async def run() -> None:
         await processor.emit_events([_EventL1(id_="root")])
-        await asyncio.sleep(0.5)
src/cqrs/events/event_handler.py (1)

1-2: Minor: Import ordering.

The collections.abc import appears before abc, but standard practice is alphabetical ordering within stdlib imports (abc before collections.abc).

🔧 Suggested reorder
-from collections.abc import Sequence
 import abc
+from collections.abc import Sequence
 import typing
tests/unit/test_event_processor.py (1)

303-357: Consider potential flakiness from timing-based assertion.

The test relies on await asyncio.sleep(0.15) to wait for parallel tasks. While the sleep duration appears sufficient (3 child events × 0.02s each under semaphore=2), timing-based waits can cause flaky tests under CI load.

Consider using asyncio.gather on tracked tasks or an event/counter to deterministically wait for completion, though the current approach is acceptable for a unit test if flakiness hasn't been observed.

tests/unit/test_event_dispatcher_follow_ups.py (1)

20-31: Handler accumulates follow-ups across invocations.

_HandlerL1._follow_ups is never cleared, so if the same handler instance handles multiple events, follow-ups will accumulate. This is fine for this single-dispatch test, but consider clearing at the start of handle() if the handler might be reused, matching the pattern used in test_event_processor.py (e.g., _FanHandler at line 325).

🔧 Optional: Clear follow-ups at start of handle
     async def handle(self, event: _EventL1) -> None:
+        self._follow_ups = []
         self.processed.append(event)
         self._follow_ups.append(_EventL2(name="L2_from_" + event.name))

@vadikko2 vadikko2 merged commit 976b3ba into master Feb 2, 2026
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant