Finaps.EventSourcing
is an implementation of the Event Sourcing Pattern in .NET 6
with a focus on Validity, Clarity & Performance.
Finaps.EventSourcing
supports SQL Server, Postgres and Azure Cosmos DB databases.
All Finaps.EventSourcing packages are available under the Apache Licence 2.0.
Alongside CosmosDB, support for relational databases is provided using Entity Framework Core.
Through EF Core, Finaps.EventSourcing
supports SQL Server & PostgreSQL databases.
Finaps.EventSourcing.EF is available on NuGet.
> dotnet add package Finaps.EventSourcing.EF
And Depending on which database you are using, make sure to install the right provider
> dotnet add package Microsoft.EntityFrameworkCore.SqlServer
or
> dotnet add package Npgsql.EntityFrameworkCore.PostgreSQL
Like most Entity Framework Core applications, the database is managed by Migrations.
The Finaps.EventSourcing.EF
package adds migrations based on the Records (Events/Snapshots/Projections) you have defined and you are responsible for updating the database using them.
To access this functionality, you have to configure a DbContext which inherits from the RecordContext
class.
You can use the OnModelCreating
method to override or add new Entities to the context.
Your DbContext
is configured in the same way as any other, refer to the Microsoft docs on how to do this,
but your configuration could look something like this:
// appsettings.json
{
"ConnectionStrings": {
"RecordStore": "<SQL Server/Postgres Connection String>"
}
}
// Startup.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<RecordContext, MyEntityFrameworkRecordContext>((options =>
{
options.UseSqlServer(configuration.GetConnectionString("RecordStore"));
// or
options.UseNpgsql(configuration.GetConnectionString("RecordStore"));
});
services.AddEventSourcing<MyEntityFrameworkRecordContext>();
}
Now you can use the EntityFrameworkRecordStore
and AggregateService
to power your backend!
Finaps.EventSourcing.Cosmos is available on NuGet.
> dotnet add package Finaps.EventSourcing.Cosmos
Finaps.EventSourcing supports Azure Cosmos DB. To create a Cosmos DB Account, Database and Container, checkout the Microsoft Documentation on Creating Cosmos DB Resources.
For local development, one can use the Docker Cosmos Emulator for Linux/macOS or Windows.
When Creating a Cosmos DB Container to use with Finaps.EventSourcing.Cosmos
, make sure to set PartitionKey
equal to /PartitionId
.
// appsettings.json
{
"Cosmos": {
"ConnectionString": "<Cosmos Connection String>",
"Database": "<Cosmos Database Name>",
"Container": "<Cosmos Container Name>"
}
}
// Startup.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddEventSourcing(Configuration.GetSection("Cosmos"));
}
Now you can use the CosmosRecordStore
and AggregateService
to power your backend!
These examples show how a (very simplified) bank account could be modelled using Finaps.EventSourcing
.
It shows how to use the three types of Records this package is concerned with: Events, Snapshots and Projections.
These examples work with both Finaps.EventSourcing.Cosmos
and Finaps.EventSourcing.EF
Checkout the Example Project for a more thorough example on how this package can be used.
Events are immutable Records that describe something that has happened to a particular Aggregate.
public record BankAccountCreatedEvent(string Name, string Iban) : Event<BankAccount>;
public record FundsDepositedEvent(decimal Amount) : Event<BankAccount>;
public record FundsWithdrawnEvent(decimal Amount) : Event<BankAccount>;
public record FundsTransferredEvent(decimal Amount, Guid DebtorAccount, Guid CreditorAccon : Event<BankAccount>;
Note:
- Events are scoped to a particular Aggregate class, specified by
Event<TAggregate>
- Events should be immutable, so either:
- use positional records**
- use
{ get; init; }
accessors
An Aggregate is an aggregation of one or more Events.
public class BankAccount : Aggregate<BankAccount>
{
public string Name { get; private set; }
public string Iban { get; private set; }
public decimal Balance { get; private set; }
protected override void Apply(Event<BankAccount> e)
{
switch (e)
{
case BankAccountCreatedEvent created:
Name = created.Name;
Iban = created.Iban;
break;
case FundsDepositedEvent deposit:
Balance += deposit.Amount;
break;
case FundsWithdrawnEvent withdraw:
Balance -= withdraw.Amount;
break;
case BankAccountFundsTransferredEvent transfer when transfer.DebtorAccount == Id:
Balance -= transfer.Amount;
break;
case BankAccountFundsTransferredEvent transfer when transfer.CreditorAccount == Id:
Balance += transfer.Amount;
break;
case BankAccountFundsTransferredEvent:
throw new InvalidOperationException("Not debtor nor creditor of this transaction");
default:
throw new ArgumentOutOfRangeException(nameof(e));
}
// An error is thrown if any event would cause the bank account balance to drop below 0
if (Balance < 0) throw new InvalidOperationException("Not enough funds");
}
}
Note:
- The
Aggregate.Apply
method contains the logic for aggregating Events.- Using C# 7+ pattern matching, logic can be added based on Event type and Aggregate State
- Aggregate properties should only be updated by applying Events, hence the
{ get; private set; }
accessors. - Aggregates reference themselves, specified by
Aggregate<TAggregate>
, which enables static type checking for theAggregate.Apply(Event<TAggregate>)
method.
// Create new Bank Account Aggregate
var account = new BankAccount();
// This created a new Aggregate.Id
Assert.NotEqual(Guid.Empty, account.Id);
// But left all other values default
Assert.Equal(default, account.Name);
Assert.Equal(default, account.Iban);
Assert.Equal(default, account.Balance);
// Create the Bank Account by applying an Event
account.Apply(new BankAccountCreatedEvent("E. Vent", "SOME IBAN");
// Add some funds to this account using a convenience method
account.Apply(new FundsDepositedEvent(100));
// By calling the Apply method, the Aggregat is now updated
Assert.Equal("E. Vent" , account.Name);
Assert.Equal("SOME IBAN", account.Iban);
Assert.Equal(100 , account.Balance);
// Finally: Persist the Aggregate
// This will store the newly added Events for this BankAccount in the ```IRecordStore```
await AggregateService.PersistAsync(account);
When you want to update an existing Aggregate, you'll first need to rehydrate the Aggregate:
// Rehydrate existing BankAccount, i.e. apply all stored Events to this BankAccount
var account = await AggregateService.RehydrateAsync<BankAccount>(bankAccountId);
// Then add more funds to the account
account.Apply(new FundsDepositedEvent(50));
// Finally, Persist Aggregate. i.e. store the newly added Event(s)
await AggregateService.PersistAsync(account);
or alternatively, the three lines of code above can be replaced with the shorthand notation:
await AggregateService.RehydrateAndPersistAsync<BankAccount>(bankAccountId,
account => account.Apply(new FundsDepositedEvent(50));
Let's spice things up and transfer money from one bank account to another. In such a transaction we want to ensure the transaction either entirely succeeds or entirely fails.
Here's where transactions come into play:
// Create another BankAccount
var anotherAccount = new BankAccount();
anotherAccount.Apply(new BankAccountCreatedEvent("S. Ourcing", "ANOTHER IBAN");
// Define a transfer of funds
var transfer = new FundsTransferredEvent(20, account.Id, anotherAccount.Id);
// Add this Event to both Aggregates
account.Apply(transfer);
anotherAccount.Apply(transfer);
// Persist both aggregates in a single ACID transaction.
await AggregateService.PersistAsync(new[] { account, anotherAccount });
When many Events are stored for a given Aggregate, rehydrating that Aggregate will get more expensive. The meaning of 'many Events' will depend on backend and database hardware, but also your performance requirements. When performance impacts are expected (or even better, measured!), Snapshots can be used to mitigate them.
Snapshots work by storing a copy of the Aggregate state every n Events. When rehydrating, the latest Snapshot and all Events after that will be used, instead of applying all Events from scratch.
To use Snapshots, first define a Snapshot
:
// A Snapshot represents the full state of an Aggregate at a given point in time
public record BankAccountSnapshot(string Name, string Iban, decimal Balance) : Snapshot<BankAccount>;
Note:
- Like Events, Snapshots are scoped to an Aggregate, specified using
Snapshot<TAggregate>
- Like Events, Snapshots should be immutable: use positional records or
{ get; init; }
accessors.
Next, define a SnapshotFactory
:
// The Snapshot Factory is resposible for creating a Snapshot at a given Event interval
public class BankAccountSnapshotFactory : SnapshotFactory<BankAccount, BankAccountSnapshot>
{
// Create a snapshot every 100 Events
public override long SnapshotInterval => 100;
// Create a Snapshot from the Aggregate
protected override BankAccountSnapshot CreateSnapshot(BankAccount aggregate) =>
new BankAccountSnapshot(aggregate.Name, aggregate.Iban, aggregate.Balance);
}
When Persisting an Aggregate using the AggregateService
a Snapshot
will be created when the SnapshotInterval
is exceeded.
This means that Snapshots will not necessarily be created at exactly SnapshotInterval
increments when applying more Events than one at a time.
After creating the Snapshot
and SnapshotFactory
, we have to apply the Snapshot
in the Aggregate.Apply
method:
public class BankAccount : Aggregate<BankAccount>
{
public string Name { get; private set; }
public string Iban { get; private set; }
public decimal Balance { get; private set; }
protected override void Apply(Snapshot<BankAccount> e)
{
switch (e)
{
case BankAccountSnapshot snapshot:
Name = snapshot.Name;
Iban = snapshot.Iban;
Balance = snapshot.Balance;
break;
}
}
}
Sometimes we want to get the historical state of a particular Aggregate at a given point in time.
This is where Event Sourcing really shines, since it is as easy as applying all Events up to a certain date.
When using Snapshots, the latest Snapshot
before the given date
will be used to speed up these point in time rehydrations as well.
// Query the Bank Account for the January 1th, 2022
var account = await AggregateService.RehydrateAsync<BankAccount>(bankAccountId, new DateTime(2022, 01, 01));
All previous examples use the AggregateService
class,
which provides a high level API for rehydrating and persisting Aggregates.
To directly interact with Record
types (Events, Snapshots & Projections), one can use the IRecordStore
.
Some examples of what can be done using the record store:
// Get all Events for a particular Aggregate type
var events = await RecordStore.GetEvents<BankAccount>() // The RecordStore exposes Events/Snapshots/Projections Queryables
.Where(x => x.AggregateId == myAggregateId) // Linq can be used to query all Record types
.OrderBy(x => x.Index) // Order by Aggregate Index
.AsAsyncEnumerable() // Call the AsAsyncEnumerable extension method to finalize the query
.ToListAsync(); // Use any System.Linq.Async method to get the result you want
// Get latest Snapshot for a particular Aggregate
var result = await RecordStore.GetSnapshots<BankAccount>()
.Where(x => x.AggregateId == myAggregateId)
.OrderByDescending(x => x.Index)
.AsAsyncEnumerable()
.FirstAsync();
Not All Linq operations are supported by CosmosDB. For an overview of the supported linq queries in CosmosDB, please refer to the CosmosDB Linq to SQL Translation documentation.
While Event Sourcing is really powerful, it is not well suited for querying many Aggregates at one time. By creating an easily queryable read-only view of an Aggregate, Projections try to tackle this problem.
Creating Projections works the same as creating Snapshots: just define a Projection
and a ProjectionFactory
:
// A Projection represents the current state of an Aggregate
public record BankAccountProjection(string Name, string Iban) : Projection;
// The Projection factory is responsible for creating a Projection every time the Aggregate is persisted
public class BankAccountProjectionFactory : ProjectionFactory<BankAccount, BankAccountProjection>
{
// This particular projection could be used for e.g. an overview page
// We left out 'Balance' (privacy reasons) and made 'Name' uppercase
// Any transformation could be done here, e.g. to make frontend consumption easier/faster
protected override BankAccountProjection CreateProjection(BankAccount aggregate) =>
new BankAccountProjection(aggregate.Name.ToUpper(), aggregate.Iban);
}
Projections are updated whenever the Aggregate of a particular type are persisted. You can make as many Projections for a given Aggregate type as you like.
To query Projections, use the RecordStore
API:
// Get first 10 BankAccount Projections, ordered by the Bank Account name
var projections = await RecordStore.GetProjections<BankAccountProjection>()
.OrderBy(x => x.Name)
.Skip(0)
.Take(10)
.AsAsyncEnumerable()
.ToListAsync();
Note: currently this feature is only available in Finaps.EventSourcing.EF
Aggregates don't usually live in a vacuum, but are related to other Aggregates. However, because Events are the source of truth and Aggregates are never directly persisted, defining foreign keys to ensure data integrity is less trivial than in non-EventSourced systems.
How do we, in the example below, ensure that Post.BlogId
is actually valid?
public class Blog : Aggregate<Blog>
{
public string Name { get; private set; }
}
public class Post : Aggregate<Post>
{
public Guid BlogId { get; private set; }
public string Content { get; private set; }
}
Since Aggregates are never stored directly but are the result of Events,
we should solve this by validating all Events that contain the BlogId
reference.
public record PostCreated(Guid BlogId, string Content) : Event<Post>;
To do this, add the following line of code for every Aggregate reference to the DbContext.OnModelCreating
:
builder.AggregateReference<PostCreated, Blog>(x => x.BlogId);
This creates a relation between the PostCreated
Event and the first Event of the referenced Blog
.
To be precise, the foreign key of the PostCreated
Event is [PartitionId, BlogId, 0]
.
This technique can be used, alongside other techniques, to increase the data integrity of your application.
Note: currently this feature is only available in Finaps.EventSourcing.EF
Even though Projections are not a source of truth, it can be beneficial to define foreign keys on them (e.g. to make certain queries easier).
Given the following projections:
public record SomeProjection : Projection;
public record DependentProjection(Guid SomeProjectionId) : Projection;
A foreign key is created like any other composite foreign key in EF Core:
protected override void OnModelCreating(ModelBuilder builder)
{
builder.Entity<DependentProjection>()
.HasOne<SomeProjection>()
.WithMany()
.HasForeignKey(x => new { x.PartitionId, x.SomeProjectionId });
}
Note: this does assume that both projections exist in the same partition.
This package stores three types of Records using the
IRecordStore
:
Events,
Snapshots and
Projections.
Records are always defined with respect to an Aggregate.
The abstract base Record
is defined below:
public abstract record Record
{
public RecordKind Kind { get; } // = Event | Snapshot | Projection
public string Type { get; init; } // = nameof(<MyRecordType>)
public string? AggregateType { get; init; } // = nameof(<MyAggregateType>)
public Guid PartitionId { get; init; } // = Aggregate.PartitionId
public Guid AggregateId { get; init; } // = Aggregate.Id
public DateTimeOffset Timestamp { get; init; } // Event/Snapshot/Projection creation time
}
Events are Records that describe what happened to an Aggregate. They are added to an append only store and form the source of truth for an Aggregate.
The base Event
is defined below:
public record Event : Record
{
public long Index { get; init; } // The index of this Event in the Event Stream
}
An Event
can be uniquely indexed by the following primary key: [PartitionId, AggregateId, Index]
Snapshots are Events that describe the complete state of an Aggregate at a particular Event
index.
Snapshots can be used to speed up the rehydration of Aggregates.
The base Snapshot
is defined below:
public record Snapshot : Event;
Like Events, A Snapshot
has the following primary key: [PartitionId, AggregateId, Index]
Projections are Records that describe the current state of an Aggregate (and hence the Event
stream).
Projections can be used to speed up queries, especially those involving many Aggregates at the same time.
The base Projection
is defined below:
public record Projection : Record
{
public string? FactoryType { get; init; } // = nameof(<MyProjectionFactory>)
public long Version { get; init; } // = Aggregate.Version
public string Hash { get; init; } // Projection Hash Code, see "Updating Projections"
public bool IsUpToDate { get; } // True if Projection is up to date
}
A Projection
of a particular type can be uniquely indexed by the following primary key: [PartitionId, AggregateId]
Unlike Events, Projections are not a source of truth, but depend on the following data:
- The
Event
stream - The
Aggregate.Apply
logic - The
ProjectionFactory.CreateProjection
logic
In order to accurately reflect the current state, Projection
s have to be updated whenever any of these data changes.
The first point, the Event
stream, is trivial to solve: The AggregateService
will simply update the Projection
whenever Events are persisted.
The last two points are less trivial, since they rely on user code.
To provide a solution, the Projection.Hash
stores a hash representation of the IL Bytecode of the methods that define Projections.
When querying projections, we can compare the stored hash to the current hash to see whether the projection was created using up to date code.
Projection.IsUpToDate
reflects this comparison.
Now we know whether a Projection
is out of date, we can actually update it using the following methods:
- Call
RehydrateAndPersist<TAggregate>(<PartitionId>, <AggregateId>)
. - Use
ProjectionUpdateService
to bulk update may Projections at once.
Aggregates are the result of applying one or more Events.
The base Aggregate is defined below:
public abstract class Aggregate
{
public string Type { get; init; } // = nameof(<MyAggregateType>)
public Guid PartitionId { get; init; } // = Guid.Empty (Can be used to partition data)
public Guid Id { get; init; } // Unique Aggregate Identifier
public long Version { get; private set; } // The number of Events applied to this Aggregate
protected abstract void Apply(Event e); // Logic to apply Events
}
Finaps.EventSourcing.Core
supports both SQL (SQL Server, Postgres) and NoSQL (CosmosDB) databases.
While the same IRecordStore
API is exposed for all databases, there are differences.
Through the topics Storage, Integrity, Migrations and Performance their respective features are covered.
This package stores Events, Snapshots and Projections, but the way they are stored differs between NoSQL and SQL databases.
Consider the following Events:
public record BankAccountCreatedEvent(string Name, string Iban) : Event<BankAccount>;
public record BankAccountFundsDepositedEvent(decimal Amount) : Event<BankAccount>;
For NoSQL, Events, Snapshots and Projections are stored as JSON in the same collection, which allows for great flexibility when it comes to creating, updating and querying them:
- No database migrations have to be done
- Arbitrary or changing data can be stored and queried
The NoSQL JSON representation of the Bank Account Events mentioned above will look like this:
[{
"AggregateType": "BankAccount",
"Type": "BankAccountCreatedEvent",
"Kind": 1, // RecordKind.Event
// Unique Id, encoding <Kind>|<AggregateId>[<Index>]
"id": "Event|f543d76a-3895-48e2-a836-f09d4a00cd7f[0]",
"PartitionId": "00000000-0000-0000-0000-000000000000",
"AggregateId": "f543d76a-3895-48e2-a836-f09d4a00cd7f",
"Index": 0,
"Timestamp": "2022-03-07T15:29:19.941474+01:00",
"Name": "E. Sourcing",
"Iban": "SOME IBAN"
}, {
"AggregateType": "BankAccount",
"Type": "BankAccountFundsDepositedEvent",
"Kind": 1, // RecordKind.Event
// Unique Id, encoding <Kind>|<AggregateId>[<Index>]
"id": "Event|f543d76a-3895-48e2-a836-f09d4a00cd7f[1]",
"PartitionId": "00000000-0000-0000-0000-000000000000",
"AggregateId": "f543d76a-3895-48e2-a836-f09d4a00cd7f",
"Index": 1,
"Timestamp": "2022-03-07T15:29:19.942085+01:00",
"Amount": 100,
}]
Snapshots and Projections are stored similarly.
SQL is a bit less flexible when storing Events, Snapshots:
- Entity Framework Core Migrations have to be created and applied every time you create/update
Event
,Snapshot
andProjection
models. - Events and Snapshots are stored in a table per Aggregate Type using Table per Hierarchy.
- pro: querying is efficient, since all Events for a given Aggregate are in one table and no joins are needed to rehydrate an Aggregate.
- con: there will be redundant
NULL
columns when they are not applicable for a given Event Type.
The SQL Database representation of the Bank Account Events mentioned above will be:
PartitionId | AggregateId | Index | AggregateType | Type | Timestamp | Name | IBAN | Amount |
---|---|---|---|---|---|---|---|---|
00000000-0000-0000-0000-000000000000 | d85e6b59-add6-46bd-bae9-f7aa0f3140e5 | 0 | BankAccount | BankAccountCreatedEvent | 2022-04-19 12:16:41.213708 +00:00 | E. Vent | SOME IBAN | NULL |
00000000-0000-0000-0000-000000000000 | d85e6b59-add6-46bd-bae9-f7aa0f3140e5 | 1 | BankAccount | BankAccountFundsDepositedEvent | 2022-04-19 12:16:41.215338 +00:00 | NULL | NULL | 100 |
Projections are stored in a unique table per Projection
type.
To ensure data integrity in the context of Event Sourcing one has to:
- validate Events
- validate Events w.r.t. Aggregate State
While both validations can be done using C# code in e.g. the Aggregate.Apply
method,
EF Core adds the option to validate Events at database level using
Check Constraints,
Unique Constraints
Foreign Key Constraints
and AggregateReferences.
When developing applications, updates to Event models are bound to happen.
Depending on which database powers your EventSourcing (NoSQL Finaps.EventSourcing.Cosmos
or SQL Finaps.EventSourcing.EF
),
special care needs to be taken in order ensure backwards compatibility.
When updating Event models using the Finaps.EventSourcing.Cosmos
package,
all existing Events will remain the way they were written to the database initially.
Your code has to handle both the original as well as the updated Event models.
The following strategies can be used:
-
When adding properties to an Event record, consider making these properties nullable: this will ensure existing events without these properties are handled correctly in your application logic. You can also specify a default value for the property right on the Event record.
-
When removing properties from an Event model, no special care has to be taken, they will simply be ignored by the JSON conversion.
-
When drastically changing your Event model, consider making an entirely new Event model instead and handle both the old and the new in the
Aggregate.Apply
method. -
When changing data types, ensure that they map to the same json representation. Be very careful when doing this.
When updating Event models using the Finaps.EventSourcing.EF
package,
all existing Events will be updated with when applying Entity Framework Core Migrations.
Special care has to be taken to not change existing Event data in the database when changing Event models.
For SQL, most NoSQL strategies mentioned above are also applicable, however:
5When adding constraints, you can choose to validate them against all existing Events in the database, allowing you to reason over the validity of all Events as a whole.
TODO: Performance Testing & Metrics
For a more thorough example using the CosmosDB database, check out the Example Project and corresponding Example Tests.