Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion cq/_core/dispatcher/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from abc import ABC, abstractmethod
from collections.abc import Awaitable, Callable
from contextlib import AsyncExitStack, suppress
from typing import Protocol, Self, runtime_checkable

from cq._core.middleware import Middleware, MiddlewareGroup
Expand Down Expand Up @@ -40,5 +41,12 @@ async def _invoke_with_middlewares(
handler: Callable[[I], Awaitable[O]],
input_value: I,
/,
fail_silently: bool = False,
) -> O:
return await self.__middleware_group.invoke(handler, input_value)
async with AsyncExitStack() as stack:
if fail_silently:
stack.enter_context(suppress(Exception))

return await self.__middleware_group.invoke(handler, input_value)

return NotImplemented
13 changes: 8 additions & 5 deletions cq/_core/dispatcher/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from cq._core.dispatcher.base import BaseDispatcher, Dispatcher
from cq._core.handler import (
HandleFunction,
HandlerFactory,
HandlerRegistry,
MultipleHandlerRegistry,
Expand Down Expand Up @@ -53,10 +54,7 @@ def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Sel
self.__registry.subscribe(input_type, factory)
return self

def _handlers_from(
self,
input_type: type[I],
) -> Iterator[Callable[[I], Awaitable[O]]]:
def _handlers_from(self, input_type: type[I]) -> Iterator[HandleFunction[[I], O]]:
return self.__registry.handlers_from(input_type)

def _trigger_listeners(self, input_value: I, /, task_group: TaskGroup) -> None:
Expand All @@ -75,7 +73,11 @@ async def dispatch(self, input_value: I, /) -> O:
self._trigger_listeners(input_value, task_group)

for handler in self._handlers_from(type(input_value)):
return await self._invoke_with_middlewares(handler, input_value)
return await self._invoke_with_middlewares(
handler,
input_value,
handler.fail_silently,
)

return NotImplemented

Expand All @@ -95,4 +97,5 @@ async def dispatch(self, input_value: I, /) -> None:
self._invoke_with_middlewares,
handler,
input_value,
handler.fail_silently,
)
99 changes: 60 additions & 39 deletions cq/_core/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,74 +21,96 @@ class Handler[**P, T](Protocol):
__slots__ = ()

@abstractmethod
async def handle(self, *args: P.args, **kwargs: P.kwargs) -> T:
async def handle(self, /, *args: P.args, **kwargs: P.kwargs) -> T:
raise NotImplementedError


@dataclass(repr=False, eq=False, frozen=True, slots=True)
class HandleFunction[**P, T]:
handler_factory: HandlerFactory[P, T]
handler_type: HandlerType[P, T] | None = field(default=None)
fail_silently: bool = field(default=False)

async def __call__(self, /, *args: P.args, **kwargs: P.kwargs) -> T:
handler = await self.handler_factory()
return await handler.handle(*args, **kwargs)


@runtime_checkable
class HandlerRegistry[I, O](Protocol):
__slots__ = ()

@abstractmethod
def handlers_from(
self,
input_type: type[I],
) -> Iterator[Callable[[I], Awaitable[O]]]:
def handlers_from(self, input_type: type[I]) -> Iterator[HandleFunction[[I], O]]:
raise NotImplementedError

@abstractmethod
def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self:
def subscribe(
self,
input_type: type[I],
handler_factory: HandlerFactory[[I], O],
handler_type: HandlerType[[I], O] | None = ...,
fail_silently: bool = ...,
) -> Self:
raise NotImplementedError


@dataclass(repr=False, eq=False, frozen=True, slots=True)
class MultipleHandlerRegistry[I, O](HandlerRegistry[I, O]):
__factories: dict[type[I], list[HandlerFactory[[I], O]]] = field(
__values: dict[type[I], list[HandleFunction[[I], O]]] = field(
default_factory=partial(defaultdict, list),
init=False,
)

def handlers_from(
def handlers_from(self, input_type: type[I]) -> Iterator[HandleFunction[[I], O]]:
for key_type in _iter_key_types(input_type):
yield from self.__values.get(key_type, ())

def subscribe(
self,
input_type: type[I],
) -> Iterator[Callable[[I], Awaitable[O]]]:
for key_type in _iter_key_types(input_type):
for factory in self.__factories.get(key_type, ()):
yield _make_handle_function(factory)
handler_factory: HandlerFactory[[I], O],
handler_type: HandlerType[[I], O] | None = None,
fail_silently: bool = False,
) -> Self:
function = HandleFunction(handler_factory, handler_type, fail_silently)

def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self:
for key_type in _build_key_types(input_type):
self.__factories[key_type].append(factory)
self.__values[key_type].append(function)

return self


@dataclass(repr=False, eq=False, frozen=True, slots=True)
class SingleHandlerRegistry[I, O](HandlerRegistry[I, O]):
__factories: dict[type[I], HandlerFactory[[I], O]] = field(
__values: dict[type[I], HandleFunction[[I], O]] = field(
default_factory=dict,
init=False,
)

def handlers_from(
self,
input_type: type[I],
) -> Iterator[Callable[[I], Awaitable[O]]]:
def handlers_from(self, input_type: type[I]) -> Iterator[HandleFunction[[I], O]]:
for key_type in _iter_key_types(input_type):
factory = self.__factories.get(key_type, None)
if factory is not None:
yield _make_handle_function(factory)
function = self.__values.get(key_type, None)
if function is not None:
yield function

def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self:
entries = {key_type: factory for key_type in _build_key_types(input_type)}
def subscribe(
self,
input_type: type[I],
handler_factory: HandlerFactory[[I], O],
handler_type: HandlerType[[I], O] | None = None,
fail_silently: bool = False,
) -> Self:
function = HandleFunction(handler_factory, handler_type, fail_silently)
entries = {key_type: function for key_type in _build_key_types(input_type)}

for key_type in entries:
if key_type in self.__factories:
if key_type in self.__values:
raise RuntimeError(
f"A handler is already registered for the input type: `{key_type}`."
)

self.__factories.update(entries)
self.__values.update(entries)
return self


Expand All @@ -105,6 +127,7 @@ def __call__(
input_or_handler_type: type[I],
/,
*,
fail_silently: bool = ...,
threadsafe: bool | None = ...,
) -> Decorator: ...

Expand All @@ -114,6 +137,7 @@ def __call__[T](
input_or_handler_type: T,
/,
*,
fail_silently: bool = ...,
threadsafe: bool | None = ...,
) -> T: ...

Expand All @@ -123,6 +147,7 @@ def __call__(
input_or_handler_type: None = ...,
/,
*,
fail_silently: bool = ...,
threadsafe: bool | None = ...,
) -> Decorator: ...

Expand All @@ -131,18 +156,24 @@ def __call__[T](
input_or_handler_type: type[I] | T | None = None,
/,
*,
fail_silently: bool = False,
threadsafe: bool | None = None,
) -> Any:
if (
input_or_handler_type is not None
and isclass(input_or_handler_type)
and issubclass(input_or_handler_type, Handler)
):
return self.__decorator(input_or_handler_type, threadsafe=threadsafe)
return self.__decorator(
input_or_handler_type,
fail_silently=fail_silently,
threadsafe=threadsafe,
)

return partial(
self.__decorator,
input_type=input_or_handler_type, # type: ignore[arg-type]
fail_silently=fail_silently,
threadsafe=threadsafe,
)

Expand All @@ -152,11 +183,12 @@ def __decorator(
/,
*,
input_type: type[I] | None = None,
fail_silently: bool = False,
threadsafe: bool | None = None,
) -> HandlerType[[I], O]:
factory = self.injection_module.make_async_factory(wrapped, threadsafe)
input_type = input_type or _resolve_input_type(wrapped)
self.registry.subscribe(input_type, factory)
self.registry.subscribe(input_type, factory, wrapped, fail_silently)
return wrapped


Expand Down Expand Up @@ -190,14 +222,3 @@ def _resolve_input_type[I, O](handler_type: HandlerType[[I], O]) -> type[I]:
f"Unable to resolve input type for handler `{handler_type}`, "
"`handle` method must have a type annotation for its first parameter."
)


def _make_handle_function[I, O](
factory: HandlerFactory[[I], O],
) -> Callable[[I], Awaitable[O]]:
return partial(__handle, factory=factory)


async def __handle[I, O](input_value: I, *, factory: HandlerFactory[[I], O]) -> O:
handler = await factory()
return await handler.handle(input_value)
2 changes: 1 addition & 1 deletion cq/middlewares/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(
self.__exceptions = tuple(exceptions)
self.__retry = retry

async def __call__(self, *args: Any, **kwargs: Any) -> MiddlewareResult[Any]:
async def __call__(self, /, *args: Any, **kwargs: Any) -> MiddlewareResult[Any]:
retry = self.__retry

for attempt in range(1, retry + 1):
Expand Down
2 changes: 1 addition & 1 deletion cq/middlewares/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class InjectionScopeMiddleware:
exist_ok: bool = field(default=False, kw_only=True)
threadsafe: bool | None = field(default=None, kw_only=True)

async def __call__(self, *args: Any, **kwargs: Any) -> MiddlewareResult[Any]:
async def __call__(self, /, *args: Any, **kwargs: Any) -> MiddlewareResult[Any]:
async with AsyncExitStack() as stack:
try:
await stack.enter_async_context(
Expand Down
16 changes: 16 additions & 0 deletions docs/guides/messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,19 @@ class TrackUserCreatedHandler(NamedTuple):
async def handle(self, event: UserCreatedEvent):
...
```

### fail_silently

The `fail_silently` option suppresses any exception raised by the handler instead of propagating it to the caller. Exceptions can still be caught and handled by middlewares before being suppressed.

This is particularly useful for non-critical event handlers where a failure should not affect the rest of the system:

```python
@event_handler(fail_silently=True)
class TrackUserCreatedHandler(NamedTuple):
analytics: AnalyticsService

async def handle(self, event: UserCreatedEvent):
# An exception here won't propagate to the caller
...
```
11 changes: 11 additions & 0 deletions tests/test_command_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,14 @@ async def handle(self, command: _Command) -> None:
assert len(history.records) == 2
assert isinstance(history.records[0].args[0], _Event)
assert isinstance(history.records[1].args[0], _Command)

async def test_dispatch_with_fail_silently(self) -> None:
class _Command: ...

@command_handler(fail_silently=True)
class _CommandHandler:
async def handle(self, command: _Command) -> None:
raise ValueError

command_bus = find_instance(AnyCommandBus)
assert await command_bus.dispatch(_Command()) is NotImplemented
26 changes: 20 additions & 6 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.