diff --git a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/Commands/CommandInterpreterTests.cs b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/Commands/CommandInterpreterTests.cs index 781f9655e..246ee0196 100644 --- a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/Commands/CommandInterpreterTests.cs +++ b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/Commands/CommandInterpreterTests.cs @@ -127,8 +127,11 @@ internal static ValueTask DoBinaryOperation(ref int value, BinaryOperationComman } [CommandHandler] - public ValueTask DoBinaryOperation(BinaryOperationCommand command, CancellationToken token) - => DoBinaryOperation(ref Value, command, token); + public ValueTask DoBinaryOperation(BinaryOperationCommand command, object context, CancellationToken token) + { + Null(context); + return DoBinaryOperation(ref Value, command, token); + } internal static ValueTask DoUnaryOperation(ref int value, UnaryOperationCommand command, CancellationToken token) { @@ -213,18 +216,11 @@ public static async Task MethodsAsHandlers() public static async Task DelegatesAsHandlers() { var state = new StrongBox(); - Func binaryOp = (command, token) => CustomInterpreter.DoBinaryOperation(ref state.Value, command, token); - Func unaryOp = (command, token) => CustomInterpreter.DoUnaryOperation(ref state.Value, command, token); - Func assignOp = (command, token) => - { - state.Value = command.Value; - return new ValueTask(); - }; var interpreter = new CommandInterpreter.Builder() - .Add(BinaryOperationCommand.Id, binaryOp) - .Add(UnaryOperationCommand.Id, unaryOp) - .Add(AssignCommand.Id, assignOp) + .Add(BinaryOperationCommand.Id, new Func(BinaryOp)) + .Add(UnaryOperationCommand.Id, new Func(UnaryOp)) + .Add(AssignCommand.Id, new Func(AssignOp)) .Build(); var entry1 = interpreter.CreateLogEntry(new BinaryOperationCommand { X = 40, Y = 2, Type = BinaryOperation.Add }, 1L); @@ -240,8 +236,19 @@ public static async Task DelegatesAsHandlers() var entry3 = interpreter.CreateLogEntry(new AssignCommand { Value = int.MaxValue }, 68L); Equal(68L, entry3.Term); - Equal(3, await interpreter.InterpretAsync(entry3)); + Equal(3, await interpreter.InterpretAsync(entry3, string.Empty)); Equal(int.MaxValue, state.Value); + + ValueTask BinaryOp(BinaryOperationCommand command, CancellationToken token) => CustomInterpreter.DoBinaryOperation(ref state.Value, command, token); + + ValueTask UnaryOp(UnaryOperationCommand command, CancellationToken token) => CustomInterpreter.DoUnaryOperation(ref state.Value, command, token); + + ValueTask AssignOp(AssignCommand command, object context, CancellationToken token) + { + NotNull(context); + state.Value = command.Value; + return ValueTask.CompletedTask; + } } [Fact] diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.CommandHandler.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.CommandHandler.cs index 57b730335..8a95a08e6 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.CommandHandler.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.CommandHandler.cs @@ -9,18 +9,29 @@ public partial class CommandInterpreter { private abstract class CommandHandler { - internal abstract ValueTask InterpretAsync(TReader reader, CancellationToken token) + internal abstract ValueTask InterpretAsync(TReader reader, object? context, CancellationToken token) where TReader : notnull, IAsyncBinaryReader; } - private sealed class CommandHandler(Func handler) : CommandHandler + private sealed class CommandHandler(Func handler) : CommandHandler where TCommand : notnull, ISerializable { + public CommandHandler(Func handler) + : this(handler.Invoke) + { + } + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] - internal override async ValueTask InterpretAsync(TReader reader, CancellationToken token) + internal override async ValueTask InterpretAsync(TReader reader, object? context, CancellationToken token) { var command = await TCommand.ReadFromAsync(reader, token).ConfigureAwait(false); - await handler(command, token).ConfigureAwait(false); + await handler(command, context, token).ConfigureAwait(false); } } +} + +file static class CommandHandlerExtensions +{ + public static ValueTask Invoke(this Func handler, TCommand command, object? context, CancellationToken token) + => handler.Invoke(command, token); } \ No newline at end of file diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.Registry.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.Registry.cs index 282778ba5..430410777 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.Registry.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.Registry.cs @@ -25,9 +25,15 @@ internal InterpretingTransformation(int id, IHandlerRegistry registry) this.id = id; } + internal object? Context + { + private get; + init; + } + async ValueTask IDataTransferObject.ITransformation.TransformAsync(TReader reader, CancellationToken token) { - await handler.InterpretAsync(reader, token).ConfigureAwait(false); + await handler.InterpretAsync(reader, Context, token).ConfigureAwait(false); return id; } } diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.cs index b41cd55b7..e2d5945f9 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.cs @@ -4,6 +4,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft.Commands; +using DotNext.Runtime; using IO.Log; using Runtime.Serialization; using static Reflection.MethodExtensions; @@ -65,14 +66,23 @@ protected CommandInterpreter() if (handlerAttr is not null && method.ReturnType == typeof(ValueTask)) { var parameters = method.GetParameterTypes(); - if (parameters.GetLength() is not 2 || !parameters[0].IsValueType || parameters[1] != typeof(CancellationToken)) - continue; - var commandType = parameters[0]; - if (!identifiers.TryGetValue(commandType, out var commandId)) + Delegate interpreter; + switch (parameters.GetLength()) + { + case 2 when parameters[0].IsValueType && parameters[1] == typeof(CancellationToken): + interpreter = Delegate.CreateDelegate(typeof(Func<,,>).MakeGenericType(parameters[0], parameters[1], typeof(ValueTask)), method.IsStatic ? null : this, method); + break; + case 3 when parameters[0].IsValueType && parameters[1] == typeof(object) && parameters[2] == typeof(CancellationToken): + interpreter = Delegate.CreateDelegate(typeof(Func<,,,>).MakeGenericType(parameters[0], parameters[1], parameters[2], typeof(ValueTask)), method.IsStatic ? null : this, method); + break; + default: + continue; + } + + if (!identifiers.TryGetValue(parameters[0], out var commandId)) continue; - var interpreter = Delegate.CreateDelegate(typeof(Func<,,>).MakeGenericType(commandType, typeof(CancellationToken), typeof(ValueTask)), method.IsStatic ? null : this, method); - interpreters.Add(commandId, Cast(Activator.CreateInstance(typeof(CommandHandler<>).MakeGenericType(commandType), interpreter))); + interpreters.Add(commandId, Cast(Activator.CreateInstance(typeof(CommandHandler<>).MakeGenericType(parameters[0]), interpreter))); if (handlerAttr.IsSnapshotHandler) snapshotCommandId = commandId; @@ -129,4 +139,24 @@ public ValueTask InterpretAsync(TEntry entry, CancellationToken tok => TryGetCommandId(ref entry, out var id) ? entry.TransformAsync(new InterpretingTransformation(id, interpreters), token) : ValueTask.FromException(new ArgumentException(ExceptionMessages.MissingCommandId, nameof(entry))); + + /// + /// Interprets log entry asynchronously. + /// + /// + /// Typically this method is called by the custom implementation of + /// method. + /// + /// The log entry to be interpreted. + /// The context to be passed to the handler. + /// The token that can be used to cancel the interpretation. + /// The type of the log entry to be interpreted. + /// The ID of the interpreted log entry. + /// The command handler was not registered for the command represented by . + /// The operation has been canceled. + public ValueTask InterpretAsync(TEntry entry, object? context, CancellationToken token = default) + where TEntry : struct, IRaftLogEntry + => TryGetCommandId(ref entry, out var id) ? + entry.TransformAsync(new InterpretingTransformation(id, interpreters) { Context = context }, token) : + ValueTask.FromException(new ArgumentException(ExceptionMessages.MissingCommandId, nameof(entry))); } \ No newline at end of file diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandRegistry.Builder.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandRegistry.Builder.cs index 237dbc547..e02e94d78 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandRegistry.Builder.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandRegistry.Builder.cs @@ -14,6 +14,17 @@ public sealed class Builder : ISupplier, IResettable private readonly Dictionary identifiers = new(); private int? snapshotCommandId; + private Builder Add(int commandId, CommandHandler handler, bool snapshotHandler) + where TCommand : notnull, ISerializable + { + identifiers.Add(typeof(TCommand), commandId); + interpreters.Add(commandId, handler); + if (snapshotHandler) + snapshotCommandId = commandId; + + return this; + } + /// /// Registers command handler. /// @@ -32,11 +43,28 @@ public Builder Add(int commandId, Func(handler)); - if (snapshotHandler) - snapshotCommandId = commandId; - return this; + return Add(commandId, new CommandHandler(handler), snapshotHandler); + } + + /// + /// Registers command handler. + /// + /// The identifier of the command. + /// The command handler. + /// + /// to register a handler for snapshot log entry; + /// to register a handler for regular log entry. + /// + /// The type of the command supported by the handler. + /// This builder. + /// is . + /// Type is not annotated with attribute. + public Builder Add(int commandId, Func handler, bool snapshotHandler = false) + where TCommand : notnull, ISerializable + { + ArgumentNullException.ThrowIfNull(handler); + + return Add(commandId, new CommandHandler(handler), snapshotHandler); } ///