Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/python-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [ "3.12" ]
python-version: [ "3.10", "3.11", "3.12" ]

steps:
- uses: actions/checkout@v4
Expand All @@ -28,6 +28,6 @@ jobs:
- name: Build package
run: python -m build
- name: Publish package
if: success() && github.event_name == 'release'
if: success() && github.event_name == 'release' && matrix.python-version == '3.12'
run: |
twine upload dist/* --username __token__ --password ${{ secrets.PYPI_API_TOKEN }}
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ jobs:
echo "No Python files changed, skipping ruff check"
exit 0
fi
cat changed.txt | xargs -r ruff check --config ruff.toml
while IFS= read -r f; do [ -f "$f" ] && echo "$f"; done < changed.txt | xargs -r ruff check --config ruff.toml

- name: Run ruff format check
run: |
if [ "${{ steps.changed.outputs.has_changes }}" != "true" ]; then
echo "No Python files changed, skipping ruff format"
exit 0
fi
cat changed.txt | xargs -r ruff format --check --config ruff.toml
while IFS= read -r f; do [ -f "$f" ] && echo "$f"; done < changed.txt | xargs -r ruff format --check --config ruff.toml

- name: Run pyright
run: |
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,5 @@ tmp/

# UV lock
uv.lock

.codspeed/
18 changes: 9 additions & 9 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@ classifiers = [
]
dependencies = [
"dataclass-wizard==0.*",
"di[anyio]==0.79.2",
"dependency-injector>=4.48.2",
"di[anyio]==0.*",
"dependency-injector>=4.0",
"orjson==3.*",
"pydantic==2.*",
"python-dotenv==1.0.1",
"retry-async==0.1.4",
"python-dotenv==1.*",
"retry-async==0.1.*",
"sqlalchemy[asyncio]==2.0.*",
"typing-extensions>=4.0.0"
"typing-extensions>=4.0"
]
description = "Python CQRS pattern implementation"
description = "Event-Driven Architecture Framework for Distributed Systems"
maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}]
name = "python-cqrs"
readme = "README.md"
requires-python = ">=3.10"
version = "4.7.3"
version = "4.8.0"

[project.optional-dependencies]
aiobreaker = ["aiobreaker>=0.3.0"]
Expand Down Expand Up @@ -62,10 +62,10 @@ examples = [
"faststream[kafka]==0.5.28",
"faker>=37.12.0",
"uvicorn==0.32.0",
"aiohttp==3.13.2"
"aiohttp==3.13.2",
"protobuf>=4.25.8",
]
kafka = ["aiokafka==0.10.0"]
protobuf = ["protobuf==4.25.5"]
rabbit = ["aio-pika==9.3.0"]

[project.urls]
Expand Down
5 changes: 4 additions & 1 deletion src/cqrs/dispatcher/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ async def _handle_event(
self,
event: IEvent,
handle_type: typing.Type[_EventHandler],
):
) -> None:
handler: _EventHandler = await self._container.resolve(handle_type)
await handler.handle(event)
for follow_up in handler.events:
await self.dispatch(follow_up)

async def dispatch(self, event: IEvent) -> None:
handler_types = self._event_map.get(type(event), [])
Expand All @@ -38,5 +40,6 @@ async def dispatch(self, event: IEvent) -> None:
"Handlers for event %s not found",
type(event).__name__,
)
return
for h_type in handler_types:
await self._handle_event(event, h_type)
12 changes: 12 additions & 0 deletions src/cqrs/events/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
"""Event types, handlers, emitter, and event map for the CQRS events layer.

Public API:
- Event types: :class:`Event`, :class:`DomainEvent`, :class:`NotificationEvent`,
and their interfaces/base classes.
- :class:`EventHandler` — handler interface; implement :meth:`EventHandler.handle`
and optionally :attr:`EventHandler.events` for follow-up events.
- :class:`EventEmitter` — sends domain events to handlers and notification events
to a message broker.
- :class:`EventMap` — registry of event type -> handler types; use :meth:`EventMap.bind`.
"""

from cqrs.events.event import (
DCEvent,
DCDomainEvent,
Expand Down
57 changes: 56 additions & 1 deletion src/cqrs/events/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,31 @@ def setup_mediator(
middlewares: typing.Iterable[mediator_middlewares.Middleware],
events_mapper: typing.Callable[[events.EventMap], None] | None = None,
) -> cqrs.EventMediator:
"""
Create an event mediator with the given container and middlewares.

Args:
container: DI container (e.g. :class:`cqrs.container.di.DIContainer`) or
any implementation of :class:`cqrs.container.protocol.Container`.
middlewares: Middleware chain for the mediator (e.g. logging).
events_mapper: Optional callable that receives an :class:`~cqrs.events.map.EventMap`
and binds event types to handler types via :meth:`~cqrs.events.map.EventMap.bind`.

Returns:
Configured :class:`cqrs.EventMediator` instance.

Example::

def bind_events(event_map: events.EventMap) -> None:
event_map.bind(OrderCreatedEvent, OrderCreatedEventHandler)

mediator = setup_mediator(
container=di_container,
middlewares=[logging_middleware.LoggingMiddleware()],
events_mapper=bind_events,
)
await mediator.emit(OrderCreatedEvent(order_id="1"))
"""
_events_mapper = events.EventMap()
if events_mapper is not None:
events_mapper(_events_mapper)
Expand Down Expand Up @@ -71,6 +96,34 @@ def bootstrap(
events_mapper: typing.Callable[[events.EventMap], None] | None = None,
on_startup: typing.List[typing.Callable[[], None]] | None = None,
) -> cqrs.EventMediator:
"""
Bootstrap an event mediator with optional middlewares and event bindings.

If ``di_container`` is a :class:`di.Container`, it is wrapped in
:class:`cqrs.container.di.DIContainer`. Logging middleware is appended
to the middleware list. Runs all ``on_startup`` callables before setup.

Args:
di_container: DI container from the ``di`` package or a CQRS container.
middlewares: Optional list of middlewares (e.g. logging, metrics).
events_mapper: Optional callable that receives an :class:`~cqrs.events.map.EventMap`
and binds event types to handler types.
on_startup: Optional list of callables to run before creating the mediator.

Returns:
Configured :class:`cqrs.EventMediator` with logging middleware enabled.

Example::

def bind_events(event_map: events.EventMap) -> None:
event_map.bind(OrderCreatedEvent, OrderCreatedEventHandler)

mediator = bootstrap(
di_container=di.Container(),
events_mapper=bind_events,
)
await mediator.emit(OrderCreatedEvent(order_id="1"))
"""
if on_startup is None:
on_startup = []

Expand All @@ -90,8 +143,10 @@ def bootstrap(
middlewares_list: typing.List[mediator_middlewares.Middleware] = list(
middlewares or [],
)
if not any(isinstance(m, logging_middleware.LoggingMiddleware) for m in middlewares_list):
middlewares_list.append(logging_middleware.LoggingMiddleware())
return setup_mediator(
container,
events_mapper=events_mapper,
middlewares=middlewares_list + [logging_middleware.LoggingMiddleware()],
middlewares=middlewares_list,
)
48 changes: 43 additions & 5 deletions src/cqrs/events/event.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import abc
import dataclasses
from dataclass_wizard import fromdict, asdict
import datetime
import os
import sys
import typing
import uuid
import sys

import dotenv
import pydantic
from dataclass_wizard import asdict, fromdict

if sys.version_info >= (3, 11):
from typing import Self # novm
Expand Down Expand Up @@ -251,6 +251,9 @@ class INotificationEvent(IEvent, typing.Generic[PayloadT]):

def proto(self) -> typing.Any: ... # Method for protobuf representation

@classmethod
def from_proto(cls, proto: typing.Any) -> Self: ...


@dataclasses.dataclass(frozen=True)
class DCNotificationEvent(
Expand Down Expand Up @@ -300,7 +303,18 @@ def proto(self) -> typing.Any:
NotImplementedError: This method must be implemented by subclasses
that need protobuf serialization.
"""
raise NotImplementedError("Method not implemented for dataclass events")
raise NotImplementedError("Method not implemented")

@classmethod
def from_proto(cls, proto: typing.Any) -> Self:
"""
Constructs event from proto event object

Raises:
NotImplementedError: This method must be implemented by subclasses
that need protobuf deserialization.
"""
raise NotImplementedError("Method not implemented")

def __hash__(self) -> int:
"""
Expand Down Expand Up @@ -345,10 +359,34 @@ class UserRegisteredEvent(PydanticNotificationEvent[dict]):

model_config = pydantic.ConfigDict(from_attributes=True)

def proto(self):
def proto(self) -> typing.Any:
"""
Return protobuf representation of the event.

Raises:
NotImplementedError: This method must be implemented by subclasses
that need protobuf serialization.
"""
raise NotImplementedError("Method not implemented")

def __hash__(self):
@classmethod
def from_proto(cls, proto: typing.Any) -> Self:
"""
Constructs event from proto event object

Raises:
NotImplementedError: This method must be implemented by subclasses
that need protobuf deserialization.
"""
raise NotImplementedError("Method not implemented")

def __hash__(self) -> int:
"""
Return the hash of the event based on its event_id.

Returns:
Hash value of the event_id.
"""
return hash(self.event_id)


Expand Down
Loading