Skip to content

Commit 343d2f2

Browse files
committed
Refactor messagebus handler
1 parent 8d59a26 commit 343d2f2

File tree

1 file changed

+35
-18
lines changed

1 file changed

+35
-18
lines changed

src/allocation/service_layer/messagebus.py

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import logging
2-
from typing import Union
2+
from typing import Any, List, Union
33

44
from allocation.domain import commands, events
55
from allocation.service_layer import handlers, unit_of_work
@@ -14,30 +14,47 @@ def handle(message: Message, uow: unit_of_work.AbstractUnitOfWork):
1414
while queue:
1515
message = queue.pop(0)
1616
if isinstance(message, events.Event):
17-
for handler in EVENT_HANDLERS[type(message)]:
18-
try:
19-
logger.debug(f"Handling event: {message}")
20-
handler(message, uow)
21-
queue.extend(uow.collect_new_events())
22-
except Exception:
23-
logger.exception(f"Error handling event: {message}")
24-
continue
17+
handle_event(message, queue, uow)
2518
elif isinstance(message, commands.Command):
26-
try:
27-
logger.debug(f"Handling command: {message}")
28-
handler = COMMAND_HANDLERS[type(message)]
29-
result = handler(message, uow)
30-
results.append(result)
31-
queue.extend(uow.collect_new_events())
32-
except Exception:
33-
logger.exception(f"Error handling command: {message}")
34-
raise
19+
result = handle_command(message, queue, uow)
20+
results.append(result)
3521
else:
3622
raise TypeError(f"Unknown message type {type(message)}")
3723

3824
return results
3925

4026

27+
def handle_event(
28+
event: events.Event,
29+
queue: List[Message],
30+
uow: unit_of_work.AbstractUnitOfWork,
31+
) -> None:
32+
for handler in EVENT_HANDLERS[type(event)]:
33+
try:
34+
logger.debug(f"Handling event: {event}")
35+
handler(event, uow)
36+
queue.extend(uow.collect_new_events())
37+
except Exception:
38+
logger.exception(f"Error handling event: {event}")
39+
continue
40+
41+
42+
def handle_command(
43+
command: commands.Command,
44+
queue: List[Message],
45+
uow: unit_of_work.AbstractUnitOfWork,
46+
) -> Any:
47+
try:
48+
logger.debug(f"Handling command: {command}")
49+
handler = COMMAND_HANDLERS[type(command)]
50+
result = handler(command, uow)
51+
queue.extend(uow.collect_new_events())
52+
return result
53+
except Exception:
54+
logger.exception(f"Error handling command: {command}")
55+
raise
56+
57+
4158
EVENT_HANDLERS = {
4259
events.OutOfStock: [handlers.send_out_of_stock_notification],
4360
}

0 commit comments

Comments
 (0)