Skip to content

kostiantyn-matsebora/streamstore

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

71 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

StreamStore

Build Quality Gate Status Coverage NuGet version (StreamStore)

Asynchronous event sourcing.

Library provides a logical layer for storing and querying events as a stream.

Heavily inspired by Greg Young's Event Store and Streamstone solutions.

Overview

Designed to be easily extended with custom database backends. Despite the fact that component implements a logical layer for storing and querying events as a stream, it does not provide functionality of DDD aggregate, such as state mutation, conflict resolution etc., but serves more as persistence layer for it.

Storage packages

Package Description Multitenancy Package
StreamStore.Sql.PostgreSql PostgreSQL implementation âś… NuGet version (StreamStore.Sql.PostgreSql)
StreamStore.Sql.Sqlite SQLite implementation âś… NuGet version (StreamStore.Sql.Sqlite)
StreamStore.InMemory In-memory implementation is provided for testing and educational purposes only âś… NuGet version (StreamStore.InMemory)
StreamStore.S3.AWS Amazon S3 implementation ❌ NuGet version (StreamStore.S3.AWS)
StreamStore.S3.B2 Backblaze B2 implementation ❌ NuGet version (StreamStore.S3.B2)

Features

The general idea is to highlight the common characteristics and features of event sourcing storage:

  • Asynchronous read and write operations.
  • Multitenancy support.
  • Automatic provisioning of storage schema.
  • Event ordering.
  • Serialization/deserialization of events.
  • Optimistic concurrency control.
  • Event duplication detection based on event ID.
  • Database agnostic test framework, including benchmarking test scenarios.
  • Binary serialization support.

Storages

Also add implementations of particular storage backends, such as:

Roadmap

  • Custom event properties (?).
  • External transaction support (?).
  • Transactional outbox pattern implementation (?).

Installation

To install the package, you can use the following command from the command line:

  # Install StreamStore package
  
  dotnet add package StreamStore

  # Install package of particular database implementation, for instance InMemory

  dotnet add package StreamStore.InMemory

or from NuGet Package Manager Console:

   # Install StreamStore package
  Install-Package StreamStore

   # Install package of particular database implementation, for instance SQLite database backend
  Install-Package StreamStore.Sql.Sqlite

Usage

  • Register store in DI container
    services.ConfigureStreamStore(x =>              // Register StreamStore
      x.EnableSchemaProvisioning()                  // Optional. Enable schema provisioning, default: false.
      
      // Register single database implementation, see details in documentation for particular database
      x.WithSingleDatabase(c =>                 
          c.UseSqliteDatabase(x =>                  // For instance, SQLite database backend
              x.ConfigureDatabase(c =>
                c.WithConnectionString(connectionString)
              )
          )
      )
      // Or enable multitenancy, see details in documentation for particular database.
      x.WithMultitenancy(c => 
          c.UseInMemoryDatabase()                   // For instance, InMemory database backend
           .UseTenantProvider<MyTenantProvider>()   // Optional. Register your  ITenantProvider implementation.
                                                    // Required if you want schema to be provisioned for each tenant.
      )
    ); 
  • Use store in your application
   // Inject IStreamStore in your service or controller for single database implementation
    public class MyService
    {
        private readonly IStreamStore store;
  
        public MyService(IStreamStore store)
        {
            this.store = store;
        }
    }
 
  // Or IStreamStoreFactory for multitenancy
    public class MyService
    {
        private readonly IStreamStoreFactory storeFactory;
  
        public MyService(IStreamStoreFactory storeFactory)
        {
            this.storeFactory = storeFactory;
        }
    }

  // Append events to stream or create a new stream if it does not exist
  // EventObject property is where you store your event
  var events = new Event[]  {
        new Event { Id = "event-1", Timestamp = DateTime.Now, EventObject = eventObject } 
        ...
      };

  try {
    store
      .BeginWriteAsync("stream-1")       // Open stream like new since revision is not provided
         .AppendEventAsync(x =>          // Append events one by one using fluent API
            x.WithId("event-3")
             .Dated(DateTime.Now)
             .WithEvent(eventObject)
         )
        ...
        .AppendRangeAsync(events)       // Or append range of events by passing IEnumerable<Event>
      .CommitAsync(token);

  } catch (StreamConcurrencyException ex) {

    // Read from stream and implement your logic for handling optimistic concurrency exception
    await foreach(var @event in await store.BeginReadAsync("stream-1", token)) {
        ...
    }
    
    // Push result to the end of stream
    store
        .BeginWriteAsync("stream-1", ex.ActualRevision)
           .AppendEventAsync(x =>                // Append events one by one using fluent API
            x.WithId( "event-4")
             .Dated(DateTime.Now)
             .WithEvent(yourEventObject)
           )
        ...
        .CommitAsync(streamId);
  } catch (StreamLockedException ex) {
    // Some database backends like S3 do not support optimistic concurrency control
    // So, the only way to handle concurrency is to lock the stream
  }

More examples of reading and writing events you can find in test scenarios of StreamStore.Testing project.

Example

Each type of storage has its own example project, for instance, you can find an example of usage in the StreamStore.Sql.Example project.

Example projects provides a simple console application that demonstrates how to configure and use StreamStore in your application as single database or multitenancy.

Single database examples demonstrates:

  • optimistic concurrency control
  • asynchronous reading and writing operations

Multitenancy examples, in turn, demonstrates asynchronous reading and writing operations in isolated tenant storage.

For getting all running options simply run the application with --help argument.

Good to know

  • Id is a value object (immutable class) that has implicit conversion from and to string.

    Thus you don't need to create Id object explicitly and use ToString() to convert to string back.
    Also implements IEquatable for itself and for String.

  • Revision is a value object (immutable class) that represents a revision of the stream.
    It is used for optimistic concurrency control and event ordering. It has implicit conversion from and to Int32 type.
    Also implements IEquatable and IComparable for itself and for Int32.

  • You can read from any stream starting from the provided revision.

  • ReadToEnd method returns collection of events from the stream starting from the provided revision:

    • Contains only unique events ordered by revision.
    • Contains only events that were committed.
  • Stream revision is always the revision of an event with maximum revision value.

  • Idempotency of reading and deletion fully depends on particular database implementation.

  • You don't need to retrieve stream to add events to it.
    Appending to stream and getting stream are separate operations.

  • Despite the fact that reading is declared as asynchronous and iterative operation, for the sake of performance it is implemented as paginated operation.

    You can define the page size by using WithReadingPageSize method of store configuration, by default it is 10 events.

  • Reading and writing operations are not thread-safe.
    Thus, it is not recommended to use the same instances of IStreamWriter or IAsyncEnumerable<StreamEvent> in multiple threads simultaneously.

Customization

To implement your own database you do not need StreamStore package, all necessary interfaces are located in StreamStore.Contracts package from command line:

  dotnet add package StreamStore.Contracts

or from NuGet Package Manager Console:

  Install-Package StreamStore.Contracts

Serialization

About serialization you can read in SERIALIZATION.md file.

Create your own database implementation

To create your own database implementation, you need to implement the following interfaces:

  • IStreamDatabase - provides methods for working with streams. Create your own implementation based on StreamDatabaseBase abstract class.

  • IStreamUnitOfWork - provides methods for appending events to the stream and saving changes.
    Create your own implementation based on StreamUnitOfWorkBase and override following methods:

      class MyStreamUnitOfWork: StreamUnitOfWorkBase
      {
        protected override Task SaveChangesAsync(EventRecordCollection uncommited, CancellationToken token)
        {
          // Implement saving logic
        }
    
        protected override Task OnEventAdded(EventRecord @event, CancellationToken token)
        {
              // Optionally implement logic for handling event added, 
              // such as instance logging, puting event to outbox or temporary storage etc.
        }
    
       protected override void Dispose(bool disposing)
       {
          // Optionally implement disposing logic
       }
      }

    Default serializer is using Newtonsoft.Json library, so you can create your own using System.Text.Json or any other, by implementing IEventSerializer interface.

Considerations

  • To implement your own database you do not need StreamStore package, all necessary interfaces are located in StreamStore.Contracts package.

  • You can register your own database implementation in the DI container using any kind of lifetime (i.e. Singleton, Transient, Scoped, etc.)

    However, if you register it as a singleton, you should be aware that it should be thread-safe and preferably stateless.

  • Solution already provides optimistic concurrency and event duplication control mechanisms, as a pre-check during stream opening.

    However, if you need consistency guaranteed, you should implement your own mechanisms as a part of IStreamUnitOfWork implementation.
    For instance, you can use a transaction mechanism supported by ACID compliant DBMS.

  • Get and Delete operations must be implemented as idempotent by their nature.

Contributing

If you experience any issues, have a question or a suggestion, or if you wish to contribute, feel free to open an issue or start a discussion.

License

MIT License