Skip to content

Commit 5199faa

Browse files
committed
Start using Events to integrate Microservices
1 parent 018dc1c commit 5199faa

File tree

12 files changed

+183
-37
lines changed

12 files changed

+183
-37
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ logs:
2020
test: up test-e2e test-integration test-unit
2121

2222
test-e2e:
23-
docker-compose run --rm --no-deps --entrypoint=pytest app /tests/e2e -v -s
23+
docker-compose run --rm --no-deps --entrypoint=pytest app /tests/e2e -vv -s
2424

2525
test-integration:
2626
docker-compose run --rm --no-deps --entrypoint=pytest app /tests/integration -v -s

docker-compose.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ services:
66
dockerfile: Dockerfile
77
depends_on:
88
- postgres
9+
- redis
910
environment:
1011
- DB_HOST=postgres
1112
- DB_PASSWORD=abc123
1213
- API_HOST=app
14+
- REDIS_HOST=redis
1315
- PYTHONDONTWRITEBYTECODE=1
1416
volumes:
1517
- ./src:/src
@@ -24,3 +26,8 @@ services:
2426
- POSTGRES_PASSWORD=abc123
2527
ports:
2628
- "54321:5432"
29+
30+
redis:
31+
image: redis:alpine
32+
ports:
33+
- "6379:6379"

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ flask==1.1.2
44
isort==5.7.0
55
psycopg2-binary==2.8.6
66
pytest==6.2.2
7+
redis==3.5.3
78
requests==2.25.1
89
sqlalchemy==1.3.23
910
tenacity==6.3.1
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import json
2+
import logging
3+
from dataclasses import asdict
4+
5+
from allocation import config
6+
from redis import Redis
7+
8+
logger = logging.getLogger(__name__)
9+
redis_client = Redis(**config.get_redis_host_and_port())
10+
11+
12+
def publish(channel, event):
13+
logger.debug(f"Publishing channel: {channel}, event: {event}")
14+
redis_client.publish(channel, json.dumps(asdict(event)))

src/allocation/config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,9 @@ def get_api_url():
1313
host = os.environ.get("API_HOST", "localhost")
1414
port = 5005 if host == "localhost" else 80
1515
return f"http://{host}:{port}"
16+
17+
18+
def get_redis_host_and_port():
19+
host = os.environ.get("REDIS_HOST", "localhost")
20+
port = 6379 if host == "localhost" else 6379
21+
return dict(host=host, port=port)

src/allocation/domain/events.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ class Event:
55
pass
66

77

8+
@dataclass
9+
class Allocated(Event):
10+
orderid: str
11+
sku: str
12+
qty: int
13+
batchref: str
14+
15+
816
@dataclass
917
class OutOfStock(Event):
1018
sku: str

src/allocation/domain/model.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import List, NewType, Optional, Set
44

55
from allocation.domain.commands import Allocate
6-
from allocation.domain.events import Event, OutOfStock
6+
from allocation.domain.events import Allocated, Event, OutOfStock
77

88
Reference = NewType("Reference", str)
99
Sku = NewType("Sku", str)
@@ -102,6 +102,9 @@ def allocate(self, line: OrderLine):
102102
return None
103103

104104
batch.allocate(line)
105+
self.events.append(
106+
Allocated(line.orderid, line.sku, line.qty, batch.reference)
107+
)
105108

106109
return batch.reference
107110

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import json
2+
import logging
3+
4+
from allocation import config
5+
from allocation.adapters import orm
6+
from allocation.domain import commands
7+
from allocation.service_layer import messagebus, unit_of_work
8+
from redis import Redis
9+
10+
logger = logging.getLevelName(__name__)
11+
redis_client = Redis(**config.get_redis_host_and_port())
12+
13+
14+
def main():
15+
orm.start_mappers()
16+
pubsub = redis_client.pubsub(ignore_subscribe_messages=True)
17+
pubsub.subscribe("change_batch_quantity")
18+
19+
for message in pubsub.listen():
20+
logger.debug(f"Received message: {message}")
21+
22+
23+
def handle_change_batch_quantity(message):
24+
data = json.loads(message["data"])
25+
command = commands.ChangeBatchQuantity(data["reference"], data["qty"])
26+
messagebus.handle(message=command, uow=unit_of_work.SqlAlchemyUnitOfWork())

src/allocation/service_layer/handlers.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from allocation.adapters import email
1+
from allocation.adapters import email, event_publisher
22
from allocation.domain import commands, events, model
33
from allocation.service_layer import unit_of_work
44

@@ -65,6 +65,12 @@ def deallocate(
6565
uow.commit()
6666

6767

68+
def publish_allocated_event(
69+
message: events.Allocated, uow: unit_of_work.AbstractUnitOfWork
70+
):
71+
event_publisher.publish("line_allocated", message)
72+
73+
6874
def send_out_of_stock_notification(
6975
message: events.OutOfStock, uow: unit_of_work.AbstractUnitOfWork
7076
):

src/allocation/service_layer/messagebus.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ def handle_command(
6161

6262

6363
EVENT_HANDLERS = {
64+
events.Allocated: [handlers.publish_allocated_event],
6465
events.OutOfStock: [handlers.send_out_of_stock_notification],
6566
}
6667

0 commit comments

Comments
 (0)