Skip to content

Ability to resolve sagas via container #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nuget/CommonDomain.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<description>A domain project for quickly implementing CQRS functionality in domain models.</description>
<summary>A domain project for quickly implementing CQRS functionality in domain models.</summary>
<dependencies>
<dependency id="EventStore" version="3.0.11305.44" />
<dependency id="EventStore" version="3.1.0.9" />
</dependencies>
</metadata>
<files>
Expand Down
7 changes: 5 additions & 2 deletions src/proj/CommonDomain.Core/AggregateBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@ protected void RaiseEvent(object @event)
}
void IAggregate.ApplyEvent(object @event)
{
this.RegisteredRoutes.Dispatch(@event);
this.Version++;
foreach (var item in ( ( @event is IEnumerable<object> ) ? @event as IEnumerable<object> : new []{ @event } ))
{
this.RegisteredRoutes.Dispatch(item);
this.Version++;
}
}
ICollection IAggregate.GetUncommittedEvents()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@
</ProjectReference>
</ItemGroup>
<ItemGroup>
<Reference Include="EventStore">
<HintPath>..\..\packages\EventStore.3.0.11305.44\lib\net40\EventStore.dll</HintPath>
<Reference Include="EventStore, Version=3.0.0.0, Culture=neutral, PublicKeyToken=7735eb81c0bd9948, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\..\..\_lib\EventStore.dll</HintPath>
</Reference>
<Reference Include="System" />
</ItemGroup>
Expand Down
146 changes: 58 additions & 88 deletions src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ namespace CommonDomain.Persistence.EventStore
using global::EventStore;
using global::EventStore.Persistence;

public class EventStoreRepository : IRepository, IDisposable
public class EventStoreRepository : IRepository
{
private const string AggregateTypeHeader = "AggregateType";
private readonly IDictionary<Guid, Snapshot> snapshots = new Dictionary<Guid, Snapshot>();
private readonly IDictionary<Guid, IEventStream> streams = new Dictionary<Guid, IEventStream>();
private readonly IStoreEvents eventStore;
private readonly IConstructAggregates factory;
private readonly IDetectConflicts conflictDetector;
Expand All @@ -25,40 +23,22 @@ public EventStoreRepository(
this.conflictDetector = conflictDetector;
}

public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposing)
return;
public virtual TAggregate GetById<TAggregate>(Guid id) where TAggregate : class, IAggregate
{
return GetById<TAggregate>(id, int.MaxValue);
}

lock (this.streams)
{
foreach (var stream in this.streams)
stream.Value.Dispose();

this.snapshots.Clear();
this.streams.Clear();
}
}

public virtual TAggregate GetById<TAggregate>(Guid id) where TAggregate : class, IAggregate
{
return GetById<TAggregate>(id, int.MaxValue);
}

public virtual TAggregate GetById<TAggregate>(Guid id, int versionToLoad) where TAggregate : class, IAggregate
{
var snapshot = this.GetSnapshot(id, versionToLoad);
var stream = this.OpenStream(id, versionToLoad, snapshot);
var aggregate = this.GetAggregate<TAggregate>(snapshot, stream);
var snapshot = GetSnapshot(id, versionToLoad);
using (var stream = OpenStream(id, versionToLoad, snapshot))
{
var aggregate = GetAggregate<TAggregate>(snapshot, stream);

ApplyEventsToAggregate(versionToLoad, stream, aggregate);
ApplyEventsToAggregate(versionToLoad, stream, aggregate);

return aggregate as TAggregate;
return aggregate as TAggregate;
}
}
private static void ApplyEventsToAggregate(int versionToLoad, IEventStream stream, IAggregate aggregate)
{
Expand All @@ -69,75 +49,65 @@ private static void ApplyEventsToAggregate(int versionToLoad, IEventStream strea
private IAggregate GetAggregate<TAggregate>(Snapshot snapshot, IEventStream stream)
{
var memento = snapshot == null ? null : snapshot.Payload as IMemento;
return this.factory.Build(typeof(TAggregate), stream.StreamId, memento);
return factory.Build(typeof(TAggregate), stream.StreamId, memento, stream.CommittedHeaders);
}
private Snapshot GetSnapshot(Guid id, int version)
{
Snapshot snapshot;
if (!this.snapshots.TryGetValue(id, out snapshot))
this.snapshots[id] = snapshot = this.eventStore.Advanced.GetSnapshot(id, version);

return snapshot;
return eventStore.Advanced.GetSnapshot(id, version);
}
private IEventStream OpenStream(Guid id, int version, Snapshot snapshot)
{
IEventStream stream;
if (this.streams.TryGetValue(id, out stream))
return stream;
return snapshot == null
? eventStore.OpenStream(id, 0, version)
: eventStore.OpenStream(snapshot, version);
}

stream = snapshot == null
? this.eventStore.OpenStream(id, 0, version)
: this.eventStore.OpenStream(snapshot, version);
public virtual void Save(IAggregate aggregate, Guid commitId, Action<IDictionary<string, object>> updateHeaders)
{
var headers = PrepareHeaders(aggregate, updateHeaders);
while (true)
{
using (var stream = PrepareStream(aggregate, headers))
{
var commitEventCount = stream.CommittedEvents.Count;

return this.streams[id] = stream;
}
try
{
stream.CommitChanges(commitId);
aggregate.ClearUncommittedEvents();
return;
}
catch (DuplicateCommitException)
{
stream.ClearChanges();
return;
}
catch (ConcurrencyException e)
{
if (ThrowOnConflict(stream, commitEventCount))
throw new ConflictingCommandException(e.Message, e);

public virtual void Save(IAggregate aggregate, Guid commitId, Action<IDictionary<string, object>> updateHeaders)
{
var headers = PrepareHeaders(aggregate, updateHeaders);
while (true)
{
var stream = this.PrepareStream(aggregate, headers);
var commitEventCount = stream.CommittedEvents.Count;

try
{
stream.CommitChanges(commitId);
aggregate.ClearUncommittedEvents();
return;
}
catch (DuplicateCommitException)
{
stream.ClearChanges();
return;
}
catch (ConcurrencyException e)
{
if (this.ThrowOnConflict(stream, commitEventCount))
throw new ConflictingCommandException(e.Message, e);

stream.ClearChanges();
}
catch (StorageException e)
{
throw new PersistenceException(e.Message, e);
}
}
stream.ClearChanges();
}
catch (StorageException e)
{
throw new PersistenceException(e.Message, e);
}
}
}
}
private IEventStream PrepareStream(IAggregate aggregate, Dictionary<string, object> headers)
{
IEventStream stream;
if (!this.streams.TryGetValue(aggregate.Id, out stream))
this.streams[aggregate.Id] = stream = this.eventStore.CreateStream(aggregate.Id);
IEventStream stream = eventStore.OpenStream(aggregate.Id, 0, int.MaxValue);

foreach (var item in headers)
stream.UncommittedHeaders[item.Key] = item.Value;
aggregate.GetUncommittedEvents()
.Cast<object>()
.Select(x => new EventMessage { Body = x })
.ToList()
.ForEach(stream.Add);
stream.UncommittedHeaders[item.Key] = item.Value;

aggregate.GetUncommittedEvents()
.Cast<object>()
.Select(x => new EventMessage { Body = x })
.ToList()
.ForEach(stream.Add);

return stream;
}
Expand All @@ -155,7 +125,7 @@ private bool ThrowOnConflict(IEventStream stream, int skip)
{
var committed = stream.CommittedEvents.Skip(skip).Select(x => x.Body);
var uncommitted = stream.UncommittedEvents.Select(x => x.Body);
return this.conflictDetector.ConflictsWith(uncommitted, committed);
return conflictDetector.ConflictsWith(uncommitted, committed);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ public class SagaEventStoreRepository : ISagaRepository, IDisposable
private const string UndispatchedMessageHeader = "UndispatchedMessage.";
private readonly IDictionary<Guid, IEventStream> streams = new Dictionary<Guid, IEventStream>();
private readonly IStoreEvents eventStore;
private readonly IConstructSagas sagaFactory;

public SagaEventStoreRepository(IStoreEvents eventStore)
public SagaEventStoreRepository(IStoreEvents eventStore, IConstructSagas sagaFactory)
{
this.eventStore = eventStore;
this.eventStore = eventStore;
this.sagaFactory = sagaFactory;
}

public void Dispose()
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
Expand All @@ -37,9 +39,9 @@ protected virtual void Dispose(bool disposing)
}
}

public TSaga GetById<TSaga>(Guid sagaId) where TSaga : class, ISaga, new()
public TSaga GetById<TSaga>(Guid sagaId) where TSaga : class, ISaga
{
return BuildSaga<TSaga>(this.OpenStream(sagaId));
return BuildSaga<TSaga>(sagaId, this.OpenStream(sagaId));
}
private IEventStream OpenStream(Guid sagaId)
{
Expand All @@ -59,9 +61,9 @@ private IEventStream OpenStream(Guid sagaId)
return this.streams[sagaId] = stream;
}

private static TSaga BuildSaga<TSaga>(IEventStream stream) where TSaga : class, ISaga, new()
private TSaga BuildSaga<TSaga>(Guid sagaId, IEventStream stream) where TSaga : class, ISaga
{
var saga = new TSaga();
var saga = sagaFactory.Build<TSaga>(sagaId);
foreach (var @event in stream.CommittedEvents.Select(x => x.Body))
saga.Transition(@event);

Expand Down Expand Up @@ -128,7 +130,7 @@ private static void Persist(IEventStream stream, Guid commitId)
catch (StorageException e)
{
throw new PersistenceException(e.Message, e);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
</Compile>
<Compile Include="ConflictingCommandException.cs" />
<Compile Include="IConstructAggregates.cs" />
<Compile Include="IConstructSagas.cs" />
<Compile Include="IRepository.cs" />
<Compile Include="ISagaRepository.cs" />
<Compile Include="PersistenceException.cs" />
Expand Down
4 changes: 3 additions & 1 deletion src/proj/CommonDomain.Persistence/IConstructAggregates.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using System.Collections.Generic;

namespace CommonDomain.Persistence
{
using System;

public interface IConstructAggregates
{
IAggregate Build(Type type, Guid id, IMemento snapshot);
IAggregate Build(Type type, Guid id, IMemento snapshot, IDictionary<string, object> headers);
}
}
12 changes: 12 additions & 0 deletions src/proj/CommonDomain.Persistence/IConstructSagas.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace CommonDomain.Persistence
{
public interface IConstructSagas
{
TSaga Build<TSaga>(Guid id) where TSaga : ISaga;
}
}
2 changes: 1 addition & 1 deletion src/proj/CommonDomain.Persistence/ISagaRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace CommonDomain.Persistence

public interface ISagaRepository
{
TSaga GetById<TSaga>(Guid sagaId) where TSaga : class, ISaga, new();
TSaga GetById<TSaga>(Guid sagaId) where TSaga : class, ISaga;
void Save(ISaga saga, Guid commitId, Action<IDictionary<string, object>> updateHeaders);
}
}