Skip to content

Commit

Permalink
Added context support
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed May 23, 2024
1 parent e543060 commit 54c3379
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,11 @@ internal static ValueTask DoBinaryOperation(ref int value, BinaryOperationComman
}

[CommandHandler]
public ValueTask DoBinaryOperation(BinaryOperationCommand command, CancellationToken token)
=> DoBinaryOperation(ref Value, command, token);
public ValueTask DoBinaryOperation(BinaryOperationCommand command, object context, CancellationToken token)
{
Null(context);
return DoBinaryOperation(ref Value, command, token);
}

internal static ValueTask DoUnaryOperation(ref int value, UnaryOperationCommand command, CancellationToken token)
{
Expand Down Expand Up @@ -213,18 +216,11 @@ public static async Task MethodsAsHandlers()
public static async Task DelegatesAsHandlers()
{
var state = new StrongBox<int>();
Func<BinaryOperationCommand, CancellationToken, ValueTask> binaryOp = (command, token) => CustomInterpreter.DoBinaryOperation(ref state.Value, command, token);
Func<UnaryOperationCommand, CancellationToken, ValueTask> unaryOp = (command, token) => CustomInterpreter.DoUnaryOperation(ref state.Value, command, token);
Func<AssignCommand, CancellationToken, ValueTask> assignOp = (command, token) =>
{
state.Value = command.Value;
return new ValueTask();
};

var interpreter = new CommandInterpreter.Builder()
.Add(BinaryOperationCommand.Id, binaryOp)
.Add(UnaryOperationCommand.Id, unaryOp)
.Add(AssignCommand.Id, assignOp)
.Add(BinaryOperationCommand.Id, new Func<BinaryOperationCommand, CancellationToken, ValueTask>(BinaryOp))
.Add(UnaryOperationCommand.Id, new Func<UnaryOperationCommand, CancellationToken, ValueTask>(UnaryOp))
.Add(AssignCommand.Id, new Func<AssignCommand, object, CancellationToken, ValueTask>(AssignOp))
.Build();

var entry1 = interpreter.CreateLogEntry(new BinaryOperationCommand { X = 40, Y = 2, Type = BinaryOperation.Add }, 1L);
Expand All @@ -240,8 +236,19 @@ public static async Task DelegatesAsHandlers()

var entry3 = interpreter.CreateLogEntry(new AssignCommand { Value = int.MaxValue }, 68L);
Equal(68L, entry3.Term);
Equal(3, await interpreter.InterpretAsync(entry3));
Equal(3, await interpreter.InterpretAsync(entry3, string.Empty));
Equal(int.MaxValue, state.Value);

ValueTask BinaryOp(BinaryOperationCommand command, CancellationToken token) => CustomInterpreter.DoBinaryOperation(ref state.Value, command, token);

ValueTask UnaryOp(UnaryOperationCommand command, CancellationToken token) => CustomInterpreter.DoUnaryOperation(ref state.Value, command, token);

ValueTask AssignOp(AssignCommand command, object context, CancellationToken token)
{
NotNull(context);
state.Value = command.Value;
return ValueTask.CompletedTask;
}
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,29 @@ public partial class CommandInterpreter
{
private abstract class CommandHandler
{
internal abstract ValueTask InterpretAsync<TReader>(TReader reader, CancellationToken token)
internal abstract ValueTask InterpretAsync<TReader>(TReader reader, object? context, CancellationToken token)
where TReader : notnull, IAsyncBinaryReader;
}

private sealed class CommandHandler<TCommand>(Func<TCommand, CancellationToken, ValueTask> handler) : CommandHandler
private sealed class CommandHandler<TCommand>(Func<TCommand, object?, CancellationToken, ValueTask> handler) : CommandHandler
where TCommand : notnull, ISerializable<TCommand>
{
public CommandHandler(Func<TCommand, CancellationToken, ValueTask> handler)
: this(handler.Invoke<TCommand>)
{
}

[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
internal override async ValueTask InterpretAsync<TReader>(TReader reader, CancellationToken token)
internal override async ValueTask InterpretAsync<TReader>(TReader reader, object? context, CancellationToken token)
{
var command = await TCommand.ReadFromAsync(reader, token).ConfigureAwait(false);
await handler(command, token).ConfigureAwait(false);
await handler(command, context, token).ConfigureAwait(false);
}
}
}

file static class CommandHandlerExtensions
{
public static ValueTask Invoke<TCommand>(this Func<TCommand, CancellationToken, ValueTask> handler, TCommand command, object? context, CancellationToken token)
=> handler.Invoke(command, token);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,15 @@ internal InterpretingTransformation(int id, IHandlerRegistry registry)
this.id = id;
}

internal object? Context
{
private get;
init;
}

async ValueTask<int> IDataTransferObject.ITransformation<int>.TransformAsync<TReader>(TReader reader, CancellationToken token)
{
await handler.InterpretAsync(reader, token).ConfigureAwait(false);
await handler.InterpretAsync(reader, Context, token).ConfigureAwait(false);
return id;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace DotNext.Net.Cluster.Consensus.Raft.Commands;

using DotNext.Runtime;
using IO.Log;
using Runtime.Serialization;
using static Reflection.MethodExtensions;
Expand Down Expand Up @@ -65,14 +66,23 @@ protected CommandInterpreter()
if (handlerAttr is not null && method.ReturnType == typeof(ValueTask))
{
var parameters = method.GetParameterTypes();
if (parameters.GetLength() is not 2 || !parameters[0].IsValueType || parameters[1] != typeof(CancellationToken))
continue;
var commandType = parameters[0];
if (!identifiers.TryGetValue(commandType, out var commandId))
Delegate interpreter;
switch (parameters.GetLength())
{
case 2 when parameters[0].IsValueType && parameters[1] == typeof(CancellationToken):
interpreter = Delegate.CreateDelegate(typeof(Func<,,>).MakeGenericType(parameters[0], parameters[1], typeof(ValueTask)), method.IsStatic ? null : this, method);
break;
case 3 when parameters[0].IsValueType && parameters[1] == typeof(object) && parameters[2] == typeof(CancellationToken):
interpreter = Delegate.CreateDelegate(typeof(Func<,,,>).MakeGenericType(parameters[0], parameters[1], parameters[2], typeof(ValueTask)), method.IsStatic ? null : this, method);
break;
default:
continue;
}

if (!identifiers.TryGetValue(parameters[0], out var commandId))
continue;

var interpreter = Delegate.CreateDelegate(typeof(Func<,,>).MakeGenericType(commandType, typeof(CancellationToken), typeof(ValueTask)), method.IsStatic ? null : this, method);
interpreters.Add(commandId, Cast<CommandHandler>(Activator.CreateInstance(typeof(CommandHandler<>).MakeGenericType(commandType), interpreter)));
interpreters.Add(commandId, Cast<CommandHandler>(Activator.CreateInstance(typeof(CommandHandler<>).MakeGenericType(parameters[0]), interpreter)));

if (handlerAttr.IsSnapshotHandler)
snapshotCommandId = commandId;
Expand Down Expand Up @@ -129,4 +139,24 @@ public ValueTask<int> InterpretAsync<TEntry>(TEntry entry, CancellationToken tok
=> TryGetCommandId(ref entry, out var id) ?
entry.TransformAsync<int, InterpretingTransformation>(new InterpretingTransformation(id, interpreters), token) :
ValueTask.FromException<int>(new ArgumentException(ExceptionMessages.MissingCommandId, nameof(entry)));

/// <summary>
/// Interprets log entry asynchronously.
/// </summary>
/// <remarks>
/// Typically this method is called by the custom implementation of
/// <see cref="MemoryBasedStateMachine.ApplyAsync(PersistentState.LogEntry)"/> method.
/// </remarks>
/// <param name="entry">The log entry to be interpreted.</param>
/// <param name="context">The context to be passed to the handler.</param>
/// <param name="token">The token that can be used to cancel the interpretation.</param>
/// <typeparam name="TEntry">The type of the log entry to be interpreted.</typeparam>
/// <returns>The ID of the interpreted log entry.</returns>
/// <exception cref="UnknownCommandException">The command handler was not registered for the command represented by <paramref name="entry"/>.</exception>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
public ValueTask<int> InterpretAsync<TEntry>(TEntry entry, object? context, CancellationToken token = default)
where TEntry : struct, IRaftLogEntry
=> TryGetCommandId(ref entry, out var id) ?
entry.TransformAsync<int, InterpretingTransformation>(new InterpretingTransformation(id, interpreters) { Context = context }, token) :
ValueTask.FromException<int>(new ArgumentException(ExceptionMessages.MissingCommandId, nameof(entry)));
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ public sealed class Builder : ISupplier<CommandInterpreter>, IResettable
private readonly Dictionary<Type, int> identifiers = new();
private int? snapshotCommandId;

private Builder Add<TCommand>(int commandId, CommandHandler<TCommand> handler, bool snapshotHandler)
where TCommand : notnull, ISerializable<TCommand>
{
identifiers.Add(typeof(TCommand), commandId);
interpreters.Add(commandId, handler);
if (snapshotHandler)
snapshotCommandId = commandId;

return this;
}

/// <summary>
/// Registers command handler.
/// </summary>
Expand All @@ -32,11 +43,28 @@ public Builder Add<TCommand>(int commandId, Func<TCommand, CancellationToken, Va
{
ArgumentNullException.ThrowIfNull(handler);

identifiers.Add(typeof(TCommand), commandId);
interpreters.Add(commandId, new CommandHandler<TCommand>(handler));
if (snapshotHandler)
snapshotCommandId = commandId;
return this;
return Add(commandId, new CommandHandler<TCommand>(handler), snapshotHandler);
}

/// <summary>
/// Registers command handler.
/// </summary>
/// <param name="commandId">The identifier of the command.</param>
/// <param name="handler">The command handler.</param>
/// <param name="snapshotHandler">
/// <see langword="true"/> to register a handler for snapshot log entry;
/// <see langword="false"/> to register a handler for regular log entry.
/// </param>
/// <typeparam name="TCommand">The type of the command supported by the handler.</typeparam>
/// <returns>This builder.</returns>
/// <exception cref="ArgumentNullException"><paramref name="handler"/> is <see langword="null"/>.</exception>
/// <exception cref="GenericArgumentException">Type <typaparamref name="TCommand"/> is not annotated with <see cref="CommandAttribute"/> attribute.</exception>
public Builder Add<TCommand>(int commandId, Func<TCommand, object?, CancellationToken, ValueTask> handler, bool snapshotHandler = false)
where TCommand : notnull, ISerializable<TCommand>
{
ArgumentNullException.ThrowIfNull(handler);

return Add(commandId, new CommandHandler<TCommand>(handler), snapshotHandler);
}

/// <summary>
Expand Down

0 comments on commit 54c3379

Please sign in to comment.