Skip to content

Commit

Permalink
Fixed the issue with Any expected state for func services (#261)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyzimarev authored Aug 28, 2023
1 parent 79329a7 commit 330506f
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (C) Ubiquitous AS.All rights reserved
// Licensed under the Apache License, Version 2.0.

using static Eventuous.CommandServiceDelegates;

namespace Eventuous;

public abstract class CommandHandlerBuilder<TAggregate, TState, TId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,12 @@
// Licensed under the Apache License, Version 2.0.

using System.Reflection;
using static Eventuous.CommandServiceDelegates;

namespace Eventuous;

using static Diagnostics.ApplicationEventSource;

public delegate Task ActOnAggregateAsync<in TAggregate, in TCommand>(TAggregate aggregate, TCommand command, CancellationToken cancellationToken)
where TAggregate : Aggregate;

public delegate void ActOnAggregate<in TAggregate, in TCommand>(TAggregate aggregate, TCommand command) where TAggregate : Aggregate;

delegate ValueTask<T> HandleUntypedCommand<T>(T aggregate, object command, CancellationToken cancellationToken) where T : Aggregate;

public delegate Task<TId> GetIdFromCommandAsync<TId, in TCommand>(TCommand command, CancellationToken cancellationToken) where TId : Id where TCommand : class;

public delegate TId GetIdFromCommand<out TId, in TCommand>(TCommand command) where TId : Id where TCommand : class;

delegate ValueTask<TId> GetIdFromUntypedCommand<TId>(object command, CancellationToken cancellationToken) where TId : Id;

public delegate IAggregateStore ResolveStore<in TCommand>(TCommand command) where TCommand : class;

delegate IAggregateStore ResolveStoreFromCommand(object command);

record RegisteredHandler<T, TId>(
ExpectedState ExpectedState,
GetIdFromUntypedCommand<TId> GetId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (C) Ubiquitous AS.All rights reserved
// Licensed under the Apache License, Version 2.0.

using static Eventuous.CommandServiceDelegates;

namespace Eventuous;

static class CommandHandlingDelegateExtensions {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (C) Ubiquitous AS. All rights reserved
// Licensed under the Apache License, Version 2.0.

using static Eventuous.CommandServiceDelegates;

namespace Eventuous;

public abstract partial class CommandService<TAggregate, TState, TId> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (C) Ubiquitous AS. All rights reserved
// Licensed under the Apache License, Version 2.0.

using static Eventuous.CommandServiceDelegates;

namespace Eventuous;

public abstract partial class CommandService<TAggregate, TState, TId> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (C) Ubiquitous AS.All rights reserved
// Licensed under the Apache License, Version 2.0.

namespace Eventuous;

public static class CommandServiceDelegates {
public delegate Task ActOnAggregateAsync<in TAggregate, in TCommand>(TAggregate aggregate, TCommand command, CancellationToken cancellationToken)
where TAggregate : Aggregate;

public delegate void ActOnAggregate<in TAggregate, in TCommand>(TAggregate aggregate, TCommand command) where TAggregate : Aggregate;

internal delegate ValueTask<T> HandleUntypedCommand<T>(T aggregate, object command, CancellationToken cancellationToken) where T : Aggregate;

public delegate Task<TId> GetIdFromCommandAsync<TId, in TCommand>(TCommand command, CancellationToken cancellationToken)
where TId : Id where TCommand : class;

public delegate TId GetIdFromCommand<out TId, in TCommand>(TCommand command) where TId : Id where TCommand : class;

internal delegate ValueTask<TId> GetIdFromUntypedCommand<TId>(object command, CancellationToken cancellationToken) where TId : Id;

public delegate IAggregateStore ResolveStore<in TCommand>(TCommand command) where TCommand : class;

internal delegate IAggregateStore ResolveStoreFromCommand(object command);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,5 @@ void AddHandlerInternal<TCommand>(RegisteredFuncHandler<TState> handler) where T
}
}

public void AddHandler<TCommand>(
ExpectedState expectedState,
GetStreamNameFromCommand<TCommand> getStreamName,
ExecuteCommand<TState, TCommand> action,
ResolveReaderFromCommand<TCommand> resolveReaderFromCommand,
ResolveWriterFromCommand<TCommand> resolveWriterFromCommand
)
where TCommand : class
=> AddHandlerInternal<TCommand>(
new RegisteredFuncHandler<TState>(
expectedState,
getStreamName.AsGetStream(),
action.AsExecute(),
resolveReaderFromCommand.AsResolveReader(),
resolveWriterFromCommand.AsResolveWriter()
)
);

public bool TryGet<TCommand>([NotNullWhen(true)] out RegisteredFuncHandler<TState>? handler) => _typeMap.TryGetValue<TCommand>(out handler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ namespace Eventuous;

public abstract class FunctionalCommandService<TState>(IEventReader reader, IEventWriter writer, TypeMapper? typeMap = null)
: IFuncCommandService<TState>, IStateCommandService<TState> where TState : State<TState>, new() {
[PublicAPI]
protected IEventReader Reader { get; } = reader;
[PublicAPI]
protected IEventWriter Writer { get; } = writer;

readonly TypeMapper _typeMap = typeMap ?? TypeMap.Instance;
readonly FuncHandlersMap<TState> _handlers = new();

Expand All @@ -28,7 +23,7 @@ protected FunctionalCommandService(IEventStore store, TypeMapper? typeMap = null
/// <typeparam name="TCommand">Command type</typeparam>
/// <returns></returns>
protected FuncCommandHandlerBuilder<TCommand, TState> On<TCommand>() where TCommand : class {
var builder = new FuncCommandHandlerBuilder<TCommand, TState>(Reader, Writer);
var builder = new FuncCommandHandlerBuilder<TCommand, TState>(reader, writer);
_builders.Add(typeof(TCommand), builder);

return builder;
Expand Down Expand Up @@ -57,11 +52,13 @@ public async Task<Result<TState>> Handle<TCommand>(TCommand command, Cancellatio
}

var streamName = await registeredHandler.GetStream(command, cancellationToken).NoContext();
var reader = registeredHandler.ResolveReaderFromCommand(command);
var writer = registeredHandler.ResolveWriterFromCommand(command);

try {
var loadedState = registeredHandler.ExpectedState switch {

Check notice on line 59 in src/Core/src/Eventuous.Application/FunctionalService/FunctionalCommandService.cs

View workflow job for this annotation

GitHub Actions / Qodana for .NET

Some values of the enum are not processed inside 'switch' expression and are handled via exception in default arm

Some values of the enum are not processed inside switch: Unknown

Check notice on line 59 in src/Core/src/Eventuous.Application/FunctionalService/FunctionalCommandService.cs

View workflow job for this annotation

GitHub Actions / Qodana for .NET

Some values of the enum are not processed inside 'switch' expression and are handled via exception in default arm

Some values of the enum are not processed inside switch: Unknown
ExpectedState.Any => await Reader.LoadStateOrNew<TState>(streamName, cancellationToken).NoContext(),
ExpectedState.Existing => await Reader.LoadState<TState>(streamName, cancellationToken).NoContext(),
ExpectedState.Any => await reader.LoadStateOrNew<TState>(streamName, cancellationToken).NoContext(),
ExpectedState.Existing => await reader.LoadState<TState>(streamName, cancellationToken).NoContext(),
ExpectedState.New => new FoldedEventStream<TState>(streamName, ExpectedStreamVersion.NoStream, Array.Empty<object>()),
_ => throw new ArgumentOutOfRangeException(nameof(registeredHandler.ExpectedState), "Unknown expected state")
};
Expand All @@ -76,7 +73,7 @@ public async Task<Result<TState>> Handle<TCommand>(TCommand command, Cancellatio
// Zero in the global position would mean nothing, so the receiver need to check the Changes.Length
if (newEvents.Length == 0) return new OkResult<TState>(newState, Array.Empty<Change>(), 0);

var storeResult = await Writer.Store(streamName, (int)loadedState.StreamVersion.Value, newEvents, static e => e, cancellationToken).NoContext();
var storeResult = await writer.Store(streamName, (int)loadedState.StreamVersion.Value, newEvents, static e => e, cancellationToken).NoContext();
var changes = newEvents.Select(x => new Change(x, _typeMap.GetTypeName(x)));
Log.CommandHandled<TCommand>();

Expand Down
49 changes: 24 additions & 25 deletions src/Core/src/Eventuous.Persistence/EventStore/StoreFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ namespace Eventuous;

public static class StoreFunctions {
public static async Task<AppendEventsResult> Store(
this IEventWriter eventWriter,
StreamName streamName,
int originalVersion,
IReadOnlyCollection<object> changes,
Func<StreamEvent, StreamEvent> amendEvent,
CancellationToken cancellationToken
) {
this IEventWriter eventWriter,
StreamName streamName,
int originalVersion,
IReadOnlyCollection<object> changes,
Func<StreamEvent, StreamEvent> amendEvent,
CancellationToken cancellationToken
) {
Ensure.NotNull(changes);

if (changes.Count == 0) return AppendEventsResult.NoOp;
Expand All @@ -30,44 +30,44 @@ CancellationToken cancellationToken
.NoContext();

return result;
}
catch (Exception e) {
} catch (Exception e) {
throw e.InnerException?.Message.Contains("WrongExpectedVersion") == true
? new OptimisticConcurrencyException(streamName, e)
: e;
}

StreamEvent ToStreamEvent(object evt, int position) {
var streamEvent = new StreamEvent(Guid.NewGuid(), evt, new Metadata(), "", position);

return amendEvent(streamEvent);
}
}

public static async Task<AppendEventsResult> Store<T>(
this IEventWriter eventWriter,
StreamName streamName,
T aggregate,
Func<StreamEvent, StreamEvent> amendEvent,
CancellationToken cancellationToken
) where T : Aggregate {
this IEventWriter eventWriter,
StreamName streamName,
T aggregate,
Func<StreamEvent, StreamEvent> amendEvent,
CancellationToken cancellationToken
) where T : Aggregate {
Ensure.NotNull(aggregate);

try {
return await eventWriter.Store(streamName, aggregate.OriginalVersion, aggregate.Changes, amendEvent, cancellationToken).NoContext();
}
catch (OptimisticConcurrencyException e) {
} catch (OptimisticConcurrencyException e) {
Log.UnableToStoreAggregate<T>(streamName, e);

throw new OptimisticConcurrencyException<T>(streamName, e.InnerException!);
}
}

public static async Task<StreamEvent[]> ReadStream(
this IEventReader eventReader,
StreamName streamName,
StreamReadPosition start,
bool failIfNotFound,
CancellationToken cancellationToken
) {
this IEventReader eventReader,
StreamName streamName,
StreamReadPosition start,
bool failIfNotFound,
CancellationToken cancellationToken
) {
const int pageSize = 500;

var streamEvents = new List<StreamEvent>();
Expand All @@ -83,8 +83,7 @@ CancellationToken cancellationToken

position = new StreamReadPosition(position.Value + events.Length);
}
}
catch (StreamNotFound) when (!failIfNotFound) {
} catch (StreamNotFound) when (!failIfNotFound) {
return Array.Empty<StreamEvent>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ CancellationToken cancellationToken
try {
var streamEvents = await reader.ReadStream(streamName, StreamReadPosition.Start, failIfNotFound, cancellationToken).NoContext();
var events = streamEvents.Select(x => x.Payload!).ToArray();
return (new FoldedEventStream<T>(streamName, new ExpectedStreamVersion(streamEvents.Last().Position), events));
var expectedVersion = events.Length == 0
? ExpectedStreamVersion.NoStream
: new ExpectedStreamVersion(streamEvents.Last().Position);
return (new FoldedEventStream<T>(streamName, expectedVersion, events));
}
catch (StreamNotFound) when (!failIfNotFound) {
return new FoldedEventStream<T>(streamName, ExpectedStreamVersion.NoStream, Array.Empty<object>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public BookingFuncService(IEventStore store, TypeMapper? typeMap = null)
OnNew<BookRoom>(cmd => GetStream(cmd.BookingId), BookRoom);
#pragma warning restore CS0618 // Type or member is obsolete
On<RecordPayment>().InState(ExpectedState.Existing).GetStream(cmd => GetStream(cmd.BookingId)).Act(RecordPayment);
On<ImportBooking>().InState(ExpectedState.Any).GetStream(cmd => GetStream(cmd.BookingId)).Act(ImportBooking);

return;

Expand All @@ -24,6 +25,10 @@ static IEnumerable<object> BookRoom(BookRoom cmd) {
yield return new RoomBooked(cmd.RoomId, cmd.CheckIn, cmd.CheckOut, cmd.Price);
}

static IEnumerable<object> ImportBooking(BookingState state, object[] events, ImportBooking cmd) {
yield return new BookingImported(cmd.RoomId, cmd.Price, cmd.CheckIn, cmd.CheckOut);
}

static IEnumerable<object> RecordPayment(BookingState state, object[] originalEvents, RecordPayment cmd) {
if (state.HasPayment(cmd.PaymentId)) yield break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,9 @@ public async Task ExecuteOnNewStream() {

[Fact]
public async Task ExecuteOnExistingStream() {
var bookRoom = await Seed();

var bookRoom = await Seed();
var paymentTime = DateTimeOffset.Now;

var cmd = new Commands.RecordPayment(new BookingId(bookRoom.BookingId), "444", new Money(bookRoom.Price), paymentTime);
var cmd = new Commands.RecordPayment(new BookingId(bookRoom.BookingId), "444", new Money(bookRoom.Price), paymentTime);

var result = await _service.Handle(cmd, default);

Expand All @@ -57,12 +55,34 @@ public async Task ExecuteOnExistingStream() {
newEvents.Should().BeEquivalentTo(expectedResult);
}

async Task<Commands.BookRoom> Seed() {
[Fact]
public async Task ExecuteOnAnyForNewStream() {
var bookRoom = GetBookRoom();
var paymentTime = DateTimeOffset.Now;

Check warning on line 61 in src/Core/test/Eventuous.Tests.Application/FunctionalServiceTests.cs

View workflow job for this annotation

GitHub Actions / Qodana for .NET

Unused local variable

Local variable 'paymentTime' is never used

Check warning on line 61 in src/Core/test/Eventuous.Tests.Application/FunctionalServiceTests.cs

View workflow job for this annotation

GitHub Actions / Qodana for .NET

Unused local variable

Local variable 'paymentTime' is never used

var cmd = new Commands.ImportBooking {
BookingId = "dummy",
Price = bookRoom.Price,
CheckIn = bookRoom.CheckIn,
CheckOut = bookRoom.CheckOut,
RoomId = bookRoom.RoomId
};
var result = await _service.Handle(cmd, default);
result.Success.Should().BeTrue();
result.Changes.Should().HaveCount(1);
}

static Commands.BookRoom GetBookRoom() {
var checkIn = LocalDate.FromDateTime(DateTime.Today);
var checkOut = checkIn.PlusDays(1);
var cmd = new Commands.BookRoom("123", "234", checkIn, checkOut, 100);

return new Commands.BookRoom("123", "234", checkIn, checkOut, 100);
}

async Task<Commands.BookRoom> Seed() {
var cmd = GetBookRoom();
await _service.Handle(cmd, default);

return cmd;
}

Expand Down
4 changes: 1 addition & 3 deletions test/Eventuous.TestHelpers/Fakes/InMemoryEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,10 @@ CancellationToken cancellationToken

// ReSharper disable once ReturnTypeCanBeEnumerable.Local
InMemoryStream FindStream(StreamName stream) {
if (!_storage.TryGetValue(stream, out var existing)) throw new NotFound(stream);
if (!_storage.TryGetValue(stream, out var existing)) throw new StreamNotFound(stream);

return existing;
}

class NotFound(StreamName stream) : Exception($"Stream not found: {stream}");
}

record StoredEvent(StreamEvent Event, int Position);
Expand Down

0 comments on commit 330506f

Please sign in to comment.