Asynchronous event sourcing.
Library provides a logical layer for storing and querying events as a stream.
Heavily inspired by Greg Young's Event Store and Streamstone
solutions.
Designed to be easily extended with custom database backends.
Despite the fact that component implements a logical layer for storing and querying events as a stream,
it does not provide functionality of DDD aggregate
, such as state mutation, conflict resolution etc., but serves more as persistence layer
for it.
Package | Description | Multitenancy | Package |
---|---|---|---|
StreamStore.Sql.PostgreSql | PostgreSQL implementation |
âś… | |
StreamStore.Sql.Sqlite | SQLite implementation |
âś… | |
StreamStore.InMemory | In-memory implementation is provided for testing and educational purposes only |
âś… | |
StreamStore.S3.AWS | Amazon S3 implementation |
❌ | |
StreamStore.S3.B2 | Backblaze B2 implementation |
❌ |
The general idea is to highlight the common characteristics and features of event sourcing storage:
- Asynchronous read and write operations.
- Multitenancy support.
- Automatic provisioning of storage schema.
- Event ordering.
- Serialization/deserialization of events.
- Optimistic concurrency control.
- Event duplication detection based on event ID.
- Database agnostic test framework, including benchmarking test scenarios.
- Binary serialization support.
Also add implementations of particular storage backends, such as:
-
In-Memory
- for testing purposes. -
Binary Object
storages:-
Backblaze B2
- Backblaze B2. -
Amazon S3
- Amazon S3.
-
-
SQL
based DBMS: -
NoSQL
based DBMS:-
Cassandra DB
- distributed storage.
-
- Custom event properties (?).
- External transaction support (?).
- Transactional outbox pattern implementation (?).
To install the package, you can use the following command from the command line:
# Install StreamStore package
dotnet add package StreamStore
# Install package of particular database implementation, for instance InMemory
dotnet add package StreamStore.InMemory
or from NuGet Package Manager Console:
# Install StreamStore package
Install-Package StreamStore
# Install package of particular database implementation, for instance SQLite database backend
Install-Package StreamStore.Sql.Sqlite
- Register store in DI container
services.ConfigureStreamStore(x => // Register StreamStore
x.EnableSchemaProvisioning() // Optional. Enable schema provisioning, default: false.
// Register single database implementation, see details in documentation for particular database
x.WithSingleDatabase(c =>
c.UseSqliteDatabase(x => // For instance, SQLite database backend
x.ConfigureDatabase(c =>
c.WithConnectionString(connectionString)
)
)
)
// Or enable multitenancy, see details in documentation for particular database.
x.WithMultitenancy(c =>
c.UseInMemoryDatabase() // For instance, InMemory database backend
.UseTenantProvider<MyTenantProvider>() // Optional. Register your ITenantProvider implementation.
// Required if you want schema to be provisioned for each tenant.
)
);
- Use store in your application
// Inject IStreamStore in your service or controller for single database implementation
public class MyService
{
private readonly IStreamStore store;
public MyService(IStreamStore store)
{
this.store = store;
}
}
// Or IStreamStoreFactory for multitenancy
public class MyService
{
private readonly IStreamStoreFactory storeFactory;
public MyService(IStreamStoreFactory storeFactory)
{
this.storeFactory = storeFactory;
}
}
// Append events to stream or create a new stream if it does not exist
// EventObject property is where you store your event
var events = new Event[] {
new Event { Id = "event-1", Timestamp = DateTime.Now, EventObject = eventObject }
...
};
try {
store
.BeginWriteAsync("stream-1") // Open stream like new since revision is not provided
.AppendEventAsync(x => // Append events one by one using fluent API
x.WithId("event-3")
.Dated(DateTime.Now)
.WithEvent(eventObject)
)
...
.AppendRangeAsync(events) // Or append range of events by passing IEnumerable<Event>
.CommitAsync(token);
} catch (StreamConcurrencyException ex) {
// Read from stream and implement your logic for handling optimistic concurrency exception
await foreach(var @event in await store.BeginReadAsync("stream-1", token)) {
...
}
// Push result to the end of stream
store
.BeginWriteAsync("stream-1", ex.ActualRevision)
.AppendEventAsync(x => // Append events one by one using fluent API
x.WithId( "event-4")
.Dated(DateTime.Now)
.WithEvent(yourEventObject)
)
...
.CommitAsync(streamId);
} catch (StreamLockedException ex) {
// Some database backends like S3 do not support optimistic concurrency control
// So, the only way to handle concurrency is to lock the stream
}
More examples of reading and writing events you can find in test scenarios of StreamStore.Testing project.
Each type of storage has its own example project, for instance, you can find an example of usage in the StreamStore.Sql.Example project.
Example projects provides a simple console application that demonstrates how to configure and use StreamStore
in your application as single database or multitenancy.
Single database
examples demonstrates:
- optimistic concurrency control
- asynchronous reading and writing operations
Multitenancy
examples, in turn, demonstrates asynchronous reading and writing operations in isolated tenant storage.
For getting all running options simply run the application with --help
argument.
-
Id
is a value object (immutable class) that has implicit conversion from and to string.Thus you don't need to create Id object explicitly and use
ToString()
to convert to string back.
Also implementsIEquatable
for itself and forString
. -
Revision
is a value object (immutable class) that represents a revision of the stream.
It is used for optimistic concurrency control and event ordering. It has implicit conversion from and toInt32
type.
Also implementsIEquatable
andIComparable
for itself and forInt32
. -
You can read from any stream starting from the provided revision.
-
ReadToEnd
method returns collection of events from the stream starting from the provided revision:- Contains only unique events ordered by revision.
- Contains only events that were committed.
-
Stream revision is always the revision of an event with maximum revision value.
-
Idempotency of reading and deletion fully depends on particular database implementation.
-
You don't need to retrieve stream to add events to it.
Appending to stream and getting stream are separate operations. -
Despite the fact that reading is declared as asynchronous and iterative operation, for the sake of performance it is implemented as paginated operation.
You can define the page size by using
WithReadingPageSize
method of store configuration, by default it is 10 events. -
Reading and writing operations are not thread-safe.
Thus, it is not recommended to use the same instances ofIStreamWriter
orIAsyncEnumerable<StreamEvent>
in multiple threads simultaneously.
To implement your own database you do not need StreamStore package, all necessary interfaces are located in StreamStore.Contracts package from command line:
dotnet add package StreamStore.Contracts
or from NuGet Package Manager Console:
Install-Package StreamStore.Contracts
About serialization you can read in SERIALIZATION.md file.
To create your own database implementation, you need to implement the following interfaces:
-
IStreamDatabase
- provides methods for working with streams. Create your own implementation based onStreamDatabaseBase
abstract class. -
IStreamUnitOfWork
- provides methods for appending events to the stream and saving changes.
Create your own implementation based onStreamUnitOfWorkBase
and override following methods:class MyStreamUnitOfWork: StreamUnitOfWorkBase { protected override Task SaveChangesAsync(EventRecordCollection uncommited, CancellationToken token) { // Implement saving logic } protected override Task OnEventAdded(EventRecord @event, CancellationToken token) { // Optionally implement logic for handling event added, // such as instance logging, puting event to outbox or temporary storage etc. } protected override void Dispose(bool disposing) { // Optionally implement disposing logic } }
Default serializer is using
Newtonsoft.Json
library, so you can create your own usingSystem.Text.Json
or any other, by implementingIEventSerializer
interface.
-
To implement your own database you do not need StreamStore package, all necessary interfaces are located in StreamStore.Contracts package.
-
You can register your own database implementation in the DI container using any kind of lifetime (i.e. Singleton, Transient, Scoped, etc.)
However, if you register it as a singleton, you should be aware that it should be thread-safe and preferably stateless.
-
Solution already provides optimistic concurrency and event duplication control mechanisms, as a pre-check during stream opening.
However, if you need consistency guaranteed, you should implement your own mechanisms as a part of IStreamUnitOfWork implementation.
For instance, you can use a transaction mechanism supported byACID compliant DBMS
. -
Get and Delete operations must be implemented as idempotent by their nature.
If you experience any issues, have a question or a suggestion, or if you wish to contribute, feel free to open an issue or start a discussion.