Skip to content

EventStoreRepository GetById with Type at Runtime #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
<Compile Include="SagaEventStoreRepository.cs">
<SubType>Code</SubType>
</Compile>
<Compile Include="TypeExtensions.cs" />
</ItemGroup>
<ItemGroup>
<Content Include="..\CustomDictionary.xml">
Expand Down
237 changes: 120 additions & 117 deletions src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs
Original file line number Diff line number Diff line change
@@ -1,97 +1,100 @@
namespace CommonDomain.Persistence.EventStore
{
using System;
using System.Collections.Generic;
using System.Linq;
using global::EventStore;
using global::EventStore.Persistence;

public class EventStoreRepository : IRepository, IDisposable
{
private const string AggregateTypeHeader = "AggregateType";
private readonly IDictionary<Guid, Snapshot> snapshots = new Dictionary<Guid, Snapshot>();
private readonly IDictionary<Guid, IEventStream> streams = new Dictionary<Guid, IEventStream>();
private readonly IStoreEvents eventStore;
private readonly IConstructAggregates factory;
private readonly IDetectConflicts conflictDetector;

public EventStoreRepository(
IStoreEvents eventStore,
IConstructAggregates factory,
IDetectConflicts conflictDetector)
{
this.eventStore = eventStore;
this.factory = factory;
this.conflictDetector = conflictDetector;
}

public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposing)
return;

lock (this.streams)
{
foreach (var stream in this.streams)
stream.Value.Dispose();

this.snapshots.Clear();
this.streams.Clear();
}
namespace CommonDomain.Persistence.EventStore
{
using System;
using System.Collections.Generic;
using System.Linq;
using global::EventStore;
using global::EventStore.Persistence;
public class EventStoreRepository : IRepository, IDisposable
{
private const string AggregateTypeHeader = "AggregateType";
private readonly IDictionary<Guid, Snapshot> snapshots = new Dictionary<Guid, Snapshot>();
private readonly IDictionary<Guid, IEventStream> streams = new Dictionary<Guid, IEventStream>();
private readonly IStoreEvents eventStore;
private readonly IConstructAggregates factory;
private readonly IDetectConflicts conflictDetector;
public EventStoreRepository(
IStoreEvents eventStore,
IConstructAggregates factory,
IDetectConflicts conflictDetector)
{
this.eventStore = eventStore;
this.factory = factory;
this.conflictDetector = conflictDetector;
}
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposing)
return;
lock (this.streams)
{
foreach (var stream in this.streams)
stream.Value.Dispose();
this.snapshots.Clear();
this.streams.Clear();
}
}

public virtual TAggregate GetById<TAggregate>(Guid id) where TAggregate : class, IAggregate
public virtual IAggregate GetById(Type aggregateType, Guid id)
{
return GetById<TAggregate>(id, int.MaxValue);
return GetById(aggregateType, id, int.MaxValue);
}

public virtual TAggregate GetById<TAggregate>(Guid id, int versionToLoad) where TAggregate : class, IAggregate
{
var snapshot = this.GetSnapshot(id, versionToLoad);
var stream = this.OpenStream(id, versionToLoad, snapshot);
var aggregate = this.GetAggregate<TAggregate>(snapshot, stream);

ApplyEventsToAggregate(versionToLoad, stream, aggregate);

return aggregate as TAggregate;
}
private static void ApplyEventsToAggregate(int versionToLoad, IEventStream stream, IAggregate aggregate)
{
if (versionToLoad == 0 || aggregate.Version < versionToLoad)
foreach (var @event in stream.CommittedEvents.Select(x => x.Body))
aggregate.ApplyEvent(@event);
}
private IAggregate GetAggregate<TAggregate>(Snapshot snapshot, IEventStream stream)
{
var memento = snapshot == null ? null : snapshot.Payload as IMemento;
return this.factory.Build(typeof(TAggregate), stream.StreamId, memento);
}
private Snapshot GetSnapshot(Guid id, int version)
{
Snapshot snapshot;
if (!this.snapshots.TryGetValue(id, out snapshot))
this.snapshots[id] = snapshot = this.eventStore.Advanced.GetSnapshot(id, version);

return snapshot;
}
private IEventStream OpenStream(Guid id, int version, Snapshot snapshot)
{
IEventStream stream;
if (this.streams.TryGetValue(id, out stream))
return stream;

stream = snapshot == null
? this.eventStore.OpenStream(id, 0, version)
: this.eventStore.OpenStream(snapshot, version);

return this.streams[id] = stream;
}

public virtual IAggregate GetById(Type aggregateType, Guid id, int versionToLoad)
{
var snapshot = this.GetSnapshot(id, versionToLoad);
var stream = this.OpenStream(id, versionToLoad, snapshot);
var aggregate = this.GetAggregate(aggregateType, snapshot, stream);

ApplyEventsToAggregate(versionToLoad, stream, aggregate);

return aggregate;
}
private static void ApplyEventsToAggregate(int versionToLoad, IEventStream stream, IAggregate aggregate)
{
if (versionToLoad == 0 || aggregate.Version < versionToLoad)
foreach (var @event in stream.CommittedEvents.Select(x => x.Body))
aggregate.ApplyEvent(@event);
}
private IAggregate GetAggregate(Type aggregateType, Snapshot snapshot, IEventStream stream)
{
if (aggregateType == null) throw new ArgumentNullException("aggregateType");
if (!aggregateType.Implements(typeof(IAggregate))) throw new ArgumentException(ExceptionMessages.NotAggregateType, "aggregateType");

var memento = snapshot == null ? null : snapshot.Payload as IMemento;
return this.factory.Build(aggregateType, stream.StreamId, memento);
}
private Snapshot GetSnapshot(Guid id, int version)
{
Snapshot snapshot;
if (!this.snapshots.TryGetValue(id, out snapshot))
this.snapshots[id] = snapshot = this.eventStore.Advanced.GetSnapshot(id, version);

return snapshot;
}
private IEventStream OpenStream(Guid id, int version, Snapshot snapshot)
{
IEventStream stream;
if (this.streams.TryGetValue(id, out stream))
return stream;

stream = snapshot == null
? this.eventStore.OpenStream(id, 0, version)
: this.eventStore.OpenStream(snapshot, version);

return this.streams[id] = stream;
}

public virtual void Save(IAggregate aggregate, Guid commitId, Action<IDictionary<string, object>> updateHeaders)
{
var headers = PrepareHeaders(aggregate, updateHeaders);
Expand Down Expand Up @@ -123,39 +126,39 @@ public virtual void Save(IAggregate aggregate, Guid commitId, Action<IDictionary
throw new PersistenceException(e.Message, e);
}
}
}
private IEventStream PrepareStream(IAggregate aggregate, Dictionary<string, object> headers)
{
IEventStream stream;
if (!this.streams.TryGetValue(aggregate.Id, out stream))
this.streams[aggregate.Id] = stream = this.eventStore.CreateStream(aggregate.Id);

foreach (var item in headers)
}
private IEventStream PrepareStream(IAggregate aggregate, Dictionary<string, object> headers)
{
IEventStream stream;
if (!this.streams.TryGetValue(aggregate.Id, out stream))
this.streams[aggregate.Id] = stream = this.eventStore.CreateStream(aggregate.Id);
foreach (var item in headers)
stream.UncommittedHeaders[item.Key] = item.Value;

aggregate.GetUncommittedEvents()
.Cast<object>()
.Select(x => new EventMessage { Body = x })
.ToList()
.ForEach(stream.Add);

return stream;
}
private static Dictionary<string, object> PrepareHeaders(IAggregate aggregate, Action<IDictionary<string, object>> updateHeaders)
{
var headers = new Dictionary<string, object>();

headers[AggregateTypeHeader] = aggregate.GetType().FullName;
if (updateHeaders != null)
updateHeaders(headers);

return headers;
}
private bool ThrowOnConflict(IEventStream stream, int skip)
{
var committed = stream.CommittedEvents.Skip(skip).Select(x => x.Body);
var uncommitted = stream.UncommittedEvents.Select(x => x.Body);
return this.conflictDetector.ConflictsWith(uncommitted, committed);
}
}
return stream;
}
private static Dictionary<string, object> PrepareHeaders(IAggregate aggregate, Action<IDictionary<string, object>> updateHeaders)
{
var headers = new Dictionary<string, object>();
headers[AggregateTypeHeader] = aggregate.GetType().FullName;
if (updateHeaders != null)
updateHeaders(headers);
return headers;
}
private bool ThrowOnConflict(IEventStream stream, int skip)
{
var committed = stream.CommittedEvents.Skip(skip).Select(x => x.Body);
var uncommitted = stream.UncommittedEvents.Select(x => x.Body);
return this.conflictDetector.ConflictsWith(uncommitted, committed);
}
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@
<data name="ConflictingCommand" xml:space="preserve">
<value>The command issued conflicted with another command that was sent by another user, actor, or process in the system. The change could not be automatically merged. Please review the data that has changed and try your change again.</value>
</data>
<data name="NotAggregateType" xml:space="preserve">
<value>The argument must implement IAggregate.</value>
</data>
<data name="NoWork" xml:space="preserve">
<value>There were no uncommitted changes to persist. When attempting to save an aggregate there must be at least one uncommitted event to persist.</value>
</data>
Expand Down
17 changes: 17 additions & 0 deletions src/proj/CommonDomain.Persistence.EventStore/TypeExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;
using System.Linq;

namespace CommonDomain.Persistence.EventStore
{
public static class TypeExtensions
{
public static Boolean Implements(this Type type, Type interfaceType)
{
return interfaceType != null &&
type != null &&
!type.IsAbstract &&
type.IsClass &&
type.GetInterfaces().Any(item => interfaceType.IsGenericTypeDefinition ? item.IsGenericType && item.GetGenericTypeDefinition() == interfaceType : item == interfaceType);
}
}
}
6 changes: 3 additions & 3 deletions src/proj/CommonDomain.Persistence/IRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ namespace CommonDomain.Persistence
using System.Collections.Generic;

public interface IRepository
{
TAggregate GetById<TAggregate>(Guid id) where TAggregate : class, IAggregate;
TAggregate GetById<TAggregate>(Guid id, int version) where TAggregate : class, IAggregate;
{
IAggregate GetById(Type aggregateType, Guid id);
IAggregate GetById(Type aggregateType, Guid id, int version);
void Save(IAggregate aggregate, Guid commitId, Action<IDictionary<string, object>> updateHeaders);
}
}
20 changes: 15 additions & 5 deletions src/proj/CommonDomain.Persistence/RepositoryExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@

namespace CommonDomain.Persistence
{
public static class RepositoryExtensions
public static class RepositoryExtensions
{
public static TAggregate GetById<TAggregate>(this IRepository repository, Guid id) where TAggregate : class, IAggregate
{
public static void Save(this IRepository repository, IAggregate aggregate, Guid commitId)
{
repository.Save(aggregate, commitId, a => {});
}
return repository.GetById(typeof(TAggregate), id, int.MaxValue) as TAggregate;
}

public static TAggregate GetById<TAggregate>(this IRepository repository, Guid id, int versionToLoad) where TAggregate : class, IAggregate
{
return repository.GetById(typeof(TAggregate), id, versionToLoad) as TAggregate;
}

public static void Save(this IRepository repository, IAggregate aggregate, Guid commitId)
{
repository.Save(aggregate, commitId, a => { });
}
}
}