This project demonstrates how a service can provide both aggregates and their corresponding change events in a consistent way.
An aggregate is an entity that is stored and retrieved as a whole. The project showcases two example aggregates:
- A "person" aggregate provides information about a person.
- A "location" aggregate provides statistical data about all persons in a city.
Aggregates can be built from any source. In this project, they are created via REST requests, as shown in the table below.
Aggregates are delivered to consumers as JSON objects via HTTP GET
requests.
# | Input operation | Resulting person aggregates | Resulting location aggregates |
---|---|---|---|
1 | POST /persons {"name":"Ann","city":"Rome"} |
{"1":{"name":"Ann","city":"Rome"}} |
{"Rome":{"total":1}} |
2 | POST /persons {"name":"Bob"} |
{"1":{"name":"Ann","city":"Rome"}} {"2":{"name":"Bob"}} |
{"Rome":{"total":1}} |
3 | PATCH /persons/2 {"city":"Rome"} |
{"1":{"name":"Ann","city":"Rome"}} {"2":{"name":"Bob","city":"Rome"}} |
{"Rome":{"total":2}} |
4 | PATCH /persons/1 {"city":null} |
{"1":{"name":"Ann"}} {"2":{"name":"Bob","city":"Rome"}} |
{"Rome":{"total":1}} |
5 | DELETE /persons/2 |
{"1":{"name":"Ann"}} |
(none) |
Every change event encodes the difference between two states of an aggregate. A consumer can rebuild the aggregate by listening to the stream of change events. The protocol of choice is JSON Merge Patch (not to be confused with JSON Patch).
# | Input operation | Resulting person event | Resulting location event |
---|---|---|---|
1 | POST /persons {"name":"Ann","city":"Rome"} |
{"1":{"name":"Ann","city":"Rome"}} |
{"Rome":{"total":1}} |
2 | POST /persons {"name":"Bob"} |
{"2":{"name":"Bob"}} |
(none) |
3 | PATCH /persons/2 {"city":"Rome"} |
{"2":{"city":"Rome"}} |
{"Rome":{"total":2}} |
4 | PATCH /persons/1 {"city":null} |
{"1":{"city":null}} |
{"Rome":{"total":1}} |
5 | DELETE /persons/1 |
{"1":null} |
{"Rome":null} |
In contrast to Event Sourcing, the consumer does not need to read the entire event stream. It can bootstrap from the aggregates and keep itself up-to-date by applying subsequent changes. Older events can be deleted, avoiding storage bottlenecks and potential GDPR violations.
The event stream can be transported by any messaging system such as ActiveMQ. The approach chosen in this project utilizes Server-Sent Events as a yet simpler solution. No shared infrastructure is required; both aggregates and events are delivered over HTTP, although through different server endpoints.
The separation into two steps, bootstrapping and event consumption, requires synchronisation: The consumer must receive change events in order. Moreover, it must not miss change events for the previously loaded aggregates. The challenge here is on server side. The server must store the aggregate and publish the corresponding change event in an atomic operation.
Atomicity can be guaranteed with the Transactional Outbox pattern, which persists the aggregate and the corresponding event in one transaction. A message relay periodically publishes new events and drops older ones. On failure, events may be published repeatedly. Therefore, events must be idempotent. This holds for the JSON Merge Patch protocol.
To match aggregate state and event, another table stores the latest revision of the aggregates.
Every event in the outbox event table is annotated with the aggregate revision.
The aggregate endpoint delivers the revision in header X-Revision
.
curl -D - http://localhost:3000/persons
produces
HTTP/1.1 200 OK
content-type: application/json
x-revision: 7
After reading the aggregates, the consumer can use the revision value to subscribe to all subsequent change events:
curl -N -H "X-Revision: 8" http://localhost:3000/person-events
Note that the consumer may also use 7 or any smaller value instead, because the events are idempotent. The only limitation is the event retention time on the server, which perodically deletes older events.
You need Rust for the server.
git clone https://github.com/mouton0815/aggregate-event-duality.git
cd aggregate-event-duality
cargo build
To use the example consumer, you also need Node.js:
cd node
npm install
Start the server with
RUST_LOG=info cargo run
Note that the server uses an in-memory SQLite database,
so the aggregates are lost on restart of the server. This can be changed by replacing the ":memory:
argument of the Aggregator
constructor in server.rs by a path to a database file,
for example "database.db"
.
When the server is running, you can start the example consumer in another shell:
node node/consumer.js
To build aggregates and product the corresponding change events, you need to create/update/delete persons via the REST endpoint of the server. Example requests:
curl -X POST -H 'Content-Type: application/json' -d '{"name":"Ann","city":"Rome"}' http://localhost:3000/persons
curl -X POST -H 'Content-Type: application/json' -d '{"name":"Bob"}' http://localhost:3000/persons
curl -X PATCH -H 'Content-Type: application/json' -d '{"city":null}' http://localhost:3000/persons/1
curl -X DELETE http://localhost:3000/persons/1
The aggregates are available at the following endpoints:
curl http://localhost:3000/persons
curl http://localhost:3000/locations
The corresponding change streams can be accessed via
curl -N -H "X-Revision: 1" http://localhost:3000/person-events
curl -N -H "X-Revision: 1" http://localhost:3000/location-events
For more examples, see curl-examples.sh.