LedgerFlow is a small but production-style Python project that demonstrates an Event-Sourced Ledger architecture using Kafka, following the principles of Command Query Responsibility Segregation (CQRS).
- Python 3.11+
- FastAPI: For building the
command_apiandquery_api. - Kafka: As a message broker for communication between services.
- PostgreSQL: As the database for the read model.
- SQLAlchemy: For ORM and database interaction.
- Docker & Docker Compose: For containerization and orchestration of the services.
- Poetry: For dependency management.
- Pytest: For testing.
- Ruff: For linting.
The system is designed with a microservices architecture, where each service has a distinct responsibility. This separation of concerns allows for a scalable, resilient, and maintainable system.
graph TD
subgraph "Client"
A[User]
end
subgraph "Application Services"
B[command_api]
C[query_api]
end
subgraph "Message Broker"
D[Kafka]
end
subgraph "Data Processing"
E[processor]
F[projector]
end
subgraph "Database"
G[PostgreSQL]
end
A --sends command--> B
B --publishes command--> D
D --command topic--> E
E --consumes command--> D
E --publishes event--> D
D --event topic--> F
F --consumes event--> G
C --queries data--> G
A --requests data--> C
command_api: A FastAPI service that exposes endpoints for clients to send commands (e.g.,CreateItemCommand). It validates the command and publishes it to a Kafka topic namedcommands.processor: A Kafka consumer that listens to thecommandstopic. It processes the command, applies business logic, and produces a corresponding event (e.g.,ItemCreatedEvent) to aneventstopic.projector: A Kafka consumer that subscribes to theeventstopic. It reads the events and projects them into a denormalized read model in a PostgreSQL database. This read model is optimized for querying.query_api: A FastAPI service that exposes REST endpoints for clients to query the read model from the PostgreSQL database.
Traditional monolithic applications often use a single database for both reads and writes, which leads to scalability bottlenecks, poor auditability, and tight coupling.
LedgerFlow demonstrates how CQRS (Command Query Responsibility Segregation) combined with Event Sourcing can solve these problems using Kafka as the communication backbone.
- Scalability — separates write and read sides for independent scaling.
- Resilience — Kafka decouples services, allowing asynchronous recovery.
- Auditability — full event log provides an immutable history of changes.
- Flexibility — new read models can evolve without touching the write logic.
- User sends a command → published to Kafka
commandstopic. - Processor consumes it → emits an event to
eventstopic. - Projector consumes the event → updates PostgreSQL read model.
- Query API serves the read data to clients.
- Docker
- Docker Compose
- Python 3.11+
- Poetry
-
Clone the repository:
git clone https://github.com/romanvlad95/LedgerFlow.git cd LedgerFlow -
Set up the environment and install dependencies:
make setup
-
Run the services:
make up
The Makefile provides a set of commands to simplify the development workflow:
make up: Start all services in detached mode.make down: Stop and remove all services.make restart: Restart all services.make build: Build the Docker images.make rebuild: Rebuild the Docker images without using the cache.make logs: View the logs of all services.make ps: List the running services.make clean: Stop and remove all services, and prune the Docker system.make lint: Run the linter.make test: Run the tests.make test-docker: Run the tests in a Docker container.make coverage: Generate a test coverage report.make verify: Run all checks (linting and tests).
Once the services are running, you can interact with the API:
-
Create an item:
curl -X POST -H "Content-Type: application/json" -d '''{"name": "My First Item", "description": "A description of my item"}''' http://localhost:8000/items
-
Get all items:
curl http://localhost:8003/items
-
Get a specific item:
After creating an item, you can get its ID from the response of the
GET /itemsendpoint.curl http://localhost:8003/items/{item_id}
To run the tests, use the following command:
make testThis will run the unit tests for the command_api and query_api, as well as the end-to-end integration test.
To generate a test coverage report, run:
make coverageThe report will be available at htmlcov/index.html.
To check the code for style and quality, run:
make lintTo run all checks, including linting and tests in a Dockerized environment, use:
make verify%%{init: { 'theme': 'base', 'themeVariables': { 'primaryColor': '#ECEFF4', 'mainBkg': '#2E3440', 'nodeBorder': '#5E81AC' } } }%%
mindmap
root((LedgerFlow))
::icon(fa fa-folder)
command_api
::icon(fa fa-folder)
app
tests
processor
::icon(fa fa-folder)
app
tests
projector
::icon(fa fa-folder)
app
tests
query_api
::icon(fa fa-folder)
app
tests
shared
tests
command_api: Contains the code for the command API service.processor: Contains the code for the command processing service.projector: Contains the code for the event projection service.query_api: Contains the code for the query API service.shared: Contains shared code, such as command and event models.tests: Contains integration tests.