From cacf3e573b460469786314428617c4ce43387194 Mon Sep 17 00:00:00 2001 From: sakno Date: Thu, 30 May 2024 21:11:13 +0300 Subject: [PATCH] Cluster 2.6.0 --- CHANGELOG.md | 10 ++++ README.md | 13 +++-- .../Raft/Commands/CommandInterpreterTests.cs | 33 +++++++----- .../Raft/MemoryBasedStateMachineTests.cs | 19 +++++-- .../Cluster/Consensus/Raft/TestLogEntry.cs | 8 ++- .../DotNext.Threading.csproj | 2 +- .../Threading/AsyncCountdownEvent.cs | 4 +- .../Threading/AsyncTrigger.cs | 4 +- .../Threading/QueuedSynchronizer.cs | 32 ++++++----- .../DotNext.AspNetCore.Cluster.csproj | 2 +- .../DotNext.Net.Cluster.csproj | 2 +- .../Cluster/Consensus/Raft/BinaryLogEntry.cs | 22 ++++++-- .../CommandInterpreter.CommandHandler.cs | 19 +++++-- .../Commands/CommandInterpreter.Registry.cs | 8 ++- .../Raft/Commands/CommandInterpreter.cs | 42 ++++++++++++--- .../Raft/Commands/CommandRegistry.Builder.cs | 38 +++++++++++-- .../Consensus/Raft/DiskBasedStateMachine.cs | 10 +++- .../Cluster/Consensus/Raft/EmptyLogEntry.cs | 7 ++- .../Cluster/Consensus/Raft/IBinaryLogEntry.cs | 8 --- .../Cluster/Consensus/Raft/IInputLogEntry.cs | 17 ++++++ .../Cluster/Consensus/Raft/JsonLogEntry.cs | 9 +++- .../Consensus/Raft/MemoryBasedStateMachine.cs | 20 ++++++- .../Consensus/Raft/PersistentState.Cache.cs | 21 ++++++-- .../Raft/PersistentState.LogEntry.cs | 11 +++- .../Raft/PersistentState.Partition.cs | 54 ++++++++++++++++++- .../Cluster/Consensus/Raft/PersistentState.cs | 23 ++++++-- 26 files changed, 352 insertions(+), 86 deletions(-) delete mode 100644 src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IBinaryLogEntry.cs create mode 100644 src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IInputLogEntry.cs diff --git a/CHANGELOG.md b/CHANGELOG.md index d52360588..f7a801acc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,16 @@ Release Notes ==== +# 05-30-2024 +DotNext.Metaprogramming 5.4.1 +* Smallish performance improvements for all synchronization primitives + +DotNext.Net.Cluster 5.6.0 +* Added support of custom data to be passed to `PersistentState.ApplyAsync` method through WAL processing pipeline + +DotNext.AspNetCore.Cluster 5.6.0 +* Updated dependencies + # 05-21-2024 DotNext.Metaprogramming 5.4.0 * Smallish performance improvements of `IndexPool` instance methods diff --git a/README.md b/README.md index e20e8240f..9497152f7 100644 --- a/README.md +++ b/README.md @@ -44,11 +44,16 @@ All these things are implemented in 100% managed code on top of existing .NET AP * [NuGet Packages](https://www.nuget.org/profiles/rvsakno) # What's new -Release Date: 05-21-2024 +Release Date: 05-30-2024 -DotNext.Metaprogramming 5.4.0 -* Smallish performance improvements of `IndexPool` instance methods -* Added ability to instantiate empty `IndexPool` +DotNext.Metaprogramming 5.4.1 +* Smallish performance improvements for all synchronization primitives + +DotNext.Net.Cluster 5.6.0 +* Added support of custom data to be passed to `PersistentState.ApplyAsync` method through WAL processing pipeline + +DotNext.AspNetCore.Cluster 5.6.0 +* Updated dependencies Changelog for previous versions located [here](./CHANGELOG.md). diff --git a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/Commands/CommandInterpreterTests.cs b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/Commands/CommandInterpreterTests.cs index 781f9655e..246ee0196 100644 --- a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/Commands/CommandInterpreterTests.cs +++ b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/Commands/CommandInterpreterTests.cs @@ -127,8 +127,11 @@ internal static ValueTask DoBinaryOperation(ref int value, BinaryOperationComman } [CommandHandler] - public ValueTask DoBinaryOperation(BinaryOperationCommand command, CancellationToken token) - => DoBinaryOperation(ref Value, command, token); + public ValueTask DoBinaryOperation(BinaryOperationCommand command, object context, CancellationToken token) + { + Null(context); + return DoBinaryOperation(ref Value, command, token); + } internal static ValueTask DoUnaryOperation(ref int value, UnaryOperationCommand command, CancellationToken token) { @@ -213,18 +216,11 @@ public static async Task MethodsAsHandlers() public static async Task DelegatesAsHandlers() { var state = new StrongBox(); - Func binaryOp = (command, token) => CustomInterpreter.DoBinaryOperation(ref state.Value, command, token); - Func unaryOp = (command, token) => CustomInterpreter.DoUnaryOperation(ref state.Value, command, token); - Func assignOp = (command, token) => - { - state.Value = command.Value; - return new ValueTask(); - }; var interpreter = new CommandInterpreter.Builder() - .Add(BinaryOperationCommand.Id, binaryOp) - .Add(UnaryOperationCommand.Id, unaryOp) - .Add(AssignCommand.Id, assignOp) + .Add(BinaryOperationCommand.Id, new Func(BinaryOp)) + .Add(UnaryOperationCommand.Id, new Func(UnaryOp)) + .Add(AssignCommand.Id, new Func(AssignOp)) .Build(); var entry1 = interpreter.CreateLogEntry(new BinaryOperationCommand { X = 40, Y = 2, Type = BinaryOperation.Add }, 1L); @@ -240,8 +236,19 @@ public static async Task DelegatesAsHandlers() var entry3 = interpreter.CreateLogEntry(new AssignCommand { Value = int.MaxValue }, 68L); Equal(68L, entry3.Term); - Equal(3, await interpreter.InterpretAsync(entry3)); + Equal(3, await interpreter.InterpretAsync(entry3, string.Empty)); Equal(int.MaxValue, state.Value); + + ValueTask BinaryOp(BinaryOperationCommand command, CancellationToken token) => CustomInterpreter.DoBinaryOperation(ref state.Value, command, token); + + ValueTask UnaryOp(UnaryOperationCommand command, CancellationToken token) => CustomInterpreter.DoUnaryOperation(ref state.Value, command, token); + + ValueTask AssignOp(AssignCommand command, object context, CancellationToken token) + { + NotNull(context); + state.Value = command.Value; + return ValueTask.CompletedTask; + } } [Fact] diff --git a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs index 815c4d2fc..fc9e3f636 100644 --- a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs +++ b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs @@ -41,6 +41,14 @@ protected override ValueTask ApplyAsync(LogEntry entry) False(entry.IsEmpty); True(entry.GetReader().TryGetRemainingBytesCount(out var length)); NotEqual(0L, length); + + switch (entry.Context) + { + case int value: + Equal(56, value); + break; + } + return ValueTask.CompletedTask; } @@ -96,9 +104,9 @@ public static async Task StateManipulations() { Equal(0, state.Term); Equal(1, await state.IncrementTermAsync(default)); - True(state.IsVotedFor(default(ClusterMemberId))); + True(state.IsVotedFor(default)); await state.UpdateVotedForAsync(member); - False(state.IsVotedFor(default(ClusterMemberId))); + False(state.IsVotedFor(default)); True(state.IsVotedFor(member)); } finally @@ -111,7 +119,7 @@ public static async Task StateManipulations() try { Equal(1, state.Term); - False(state.IsVotedFor(default(ClusterMemberId))); + False(state.IsVotedFor(default)); True(state.IsVotedFor(member)); } finally @@ -149,7 +157,7 @@ public static async Task EmptyLogEntry() [InlineData(1024, false, 65)] public static async Task QueryAppendEntries(long partitionSize, bool caching, int concurrentReads) { - var entry1 = new TestLogEntry("SET X = 0") { Term = 42L }; + var entry1 = new TestLogEntry("SET X = 0") { Term = 42L, Context = 56 }; var entry2 = new TestLogEntry("SET Y = 1") { Term = 43L }; var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName()); Func, long?, CancellationToken, ValueTask> checker; @@ -175,6 +183,7 @@ public static async Task QueryAppendEntries(long partitionSize, bool caching, in Equal(0L, entries.First().Term); // element 0 Equal(42L, entries.Skip(1).First().Term); // element 1 Equal(entry1.Content, await entries[1].ToStringAsync(Encoding.UTF8)); + Equal(entry1.Context, IsAssignableFrom(entries[1]).Context); return Missing.Value; }; @@ -460,7 +469,7 @@ public static async Task PartitionOverflow(bool useCaching) [InlineData(false)] public static async Task Commit(bool useCaching) { - var entry1 = new TestLogEntry("SET X = 0") { Term = 42L }; + var entry1 = new TestLogEntry("SET X = 0") { Term = 42L, Context = 56 }; var entry2 = new TestLogEntry("SET Y = 1") { Term = 43L }; var entry3 = new TestLogEntry("SET Z = 2") { Term = 44L }; var entry4 = new TestLogEntry("SET U = 3") { Term = 45L }; diff --git a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/TestLogEntry.cs b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/TestLogEntry.cs index bace116fb..7ab7ac40e 100644 --- a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/TestLogEntry.cs +++ b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/TestLogEntry.cs @@ -6,7 +6,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft; using TextMessage = Messaging.TextMessage; [ExcludeFromCodeCoverage] -internal sealed class TestLogEntry : TextMessage, IRaftLogEntry +internal sealed class TestLogEntry : TextMessage, IInputLogEntry { public TestLogEntry(string command) : base(command, "Entry") @@ -19,4 +19,10 @@ public TestLogEntry(string command) public long Term { get; set; } bool ILogEntry.IsSnapshot => false; + + public object Context + { + get; + init; + } } \ No newline at end of file diff --git a/src/DotNext.Threading/DotNext.Threading.csproj b/src/DotNext.Threading/DotNext.Threading.csproj index 03dad4d50..49c3dfe02 100644 --- a/src/DotNext.Threading/DotNext.Threading.csproj +++ b/src/DotNext.Threading/DotNext.Threading.csproj @@ -7,7 +7,7 @@ true true nullablePublicOnly - 5.4.0 + 5.4.1 .NET Foundation and Contributors .NEXT Family of Libraries diff --git a/src/DotNext.Threading/Threading/AsyncCountdownEvent.cs b/src/DotNext.Threading/Threading/AsyncCountdownEvent.cs index 7a77e84b5..a13b5dc61 100644 --- a/src/DotNext.Threading/Threading/AsyncCountdownEvent.cs +++ b/src/DotNext.Threading/Threading/AsyncCountdownEvent.cs @@ -293,7 +293,7 @@ internal ValueTask SignalAndWaitAsync(out bool completedSynchronously, Tim goto resume_suspended_callers; } - factory = EnqueueNode(ref pool, ref manager, throwOnTimeout: false); + factory = EnqueueNode(ref pool, WaitNodeFlags.None); } completedSynchronously = false; @@ -335,7 +335,7 @@ internal ValueTask SignalAndWaitAsync(out bool completedSynchronously, Cancellat goto resume_suspended_callers; } - factory = EnqueueNode(ref pool, ref manager, throwOnTimeout: true); + factory = EnqueueNode(ref pool, WaitNodeFlags.ThrowOnTimeout); } completedSynchronously = false; diff --git a/src/DotNext.Threading/Threading/AsyncTrigger.cs b/src/DotNext.Threading/Threading/AsyncTrigger.cs index 581a3333e..afbbfeeba 100644 --- a/src/DotNext.Threading/Threading/AsyncTrigger.cs +++ b/src/DotNext.Threading/Threading/AsyncTrigger.cs @@ -189,7 +189,7 @@ public ValueTask SignalAndWaitAsync(bool resumeAll, bool throwOnEmptyQueue suspendedCallers = Detach(resumeAll)?.SetResult(true, out signaled); factory = !signaled && throwOnEmptyQueue ? EmptyWaitQueueExceptionFactory.Instance - : EnqueueNode(ref pool, ref manager, throwOnTimeout: false); + : EnqueueNode(ref pool, WaitNodeFlags.None); } suspendedCallers?.Unwind(); @@ -229,7 +229,7 @@ public ValueTask SignalAndWaitAsync(bool resumeAll, bool throwOnEmptyQueue, Canc suspendedCallers = Detach(resumeAll)?.SetResult(true, out signaled); factory = !signaled && throwOnEmptyQueue ? EmptyWaitQueueExceptionFactory.Instance - : EnqueueNode(ref pool, ref manager, throwOnTimeout: true); + : EnqueueNode(ref pool, WaitNodeFlags.ThrowOnTimeout); } suspendedCallers?.Unwind(); diff --git a/src/DotNext.Threading/Threading/QueuedSynchronizer.cs b/src/DotNext.Threading/Threading/QueuedSynchronizer.cs index edd2d9b93..4f4be5c98 100644 --- a/src/DotNext.Threading/Threading/QueuedSynchronizer.cs +++ b/src/DotNext.Threading/Threading/QueuedSynchronizer.cs @@ -138,7 +138,7 @@ private protected void EnqueueNode(WaitNode node) SuspendedCallersMeter.Add(1, measurementTags); } - private protected TNode EnqueueNode(ref ValueTaskPool> pool, ref TLockManager manager, bool throwOnTimeout) + private protected TNode EnqueueNode(ref ValueTaskPool> pool, WaitNodeFlags flags) where TNode : WaitNode, IPooledManualResetCompletionSource>, new() where TLockManager : struct, ILockManager { @@ -146,7 +146,7 @@ private protected TNode EnqueueNode(ref ValueTaskPool(ref Valu break; } - factory = EnqueueNode(ref pool, ref manager, throwOnTimeout: true); + factory = EnqueueNode(ref pool, WaitNodeFlags.ThrowOnTimeout); } interruptedCallers?.Unwind(); @@ -296,7 +296,7 @@ private protected ValueTask TryAcquireAsync break; } - factory = EnqueueNode(ref pool, ref manager, throwOnTimeout: false); + factory = EnqueueNode(ref pool, WaitNodeFlags.None); } interruptedCallers?.Unwind(); @@ -460,11 +460,18 @@ internal CallerInformationStorage(Func callerInfoProvider) } } + [Flags] + internal enum WaitNodeFlags + { + None = 0, + ThrowOnTimeout = 1, + } + private protected abstract class WaitNode : LinkedValueTaskCompletionSource { private readonly WeakReference owner = new(target: null, trackResurrection: false); private Timestamp createdAt; - private bool throwOnTimeout; + private WaitNodeFlags flags; // stores information about suspended caller for debugging purposes internal object? CallerInfo @@ -483,17 +490,18 @@ protected override void Cleanup() [DebuggerBrowsable(DebuggerBrowsableState.Never)] internal bool NeedsRemoval => CompletionData is null; - internal void Initialize(QueuedSynchronizer owner, bool throwOnTimeout) + internal void Initialize(QueuedSynchronizer owner, WaitNodeFlags flags) { Debug.Assert(owner is not null); - this.throwOnTimeout = throwOnTimeout; + this.flags = flags; this.owner.SetTarget(owner); CallerInfo = owner.callerInfo?.Capture(); createdAt = new(); } - protected sealed override Result OnTimeout() => throwOnTimeout ? base.OnTimeout() : false; + protected sealed override Result OnTimeout() + => (flags & WaitNodeFlags.ThrowOnTimeout) is not 0 ? base.OnTimeout() : false; private protected static void AfterConsumed(T node) where T : WaitNode, IPooledManualResetCompletionSource> @@ -806,13 +814,13 @@ private bool TryAcquireCore(TContext context) return false; } - private WaitNode EnqueueNode(TContext context, bool throwOnTimeout) + private WaitNode EnqueueNode(TContext context, WaitNodeFlags flags) { Debug.Assert(Monitor.IsEntered(SyncRoot)); var node = pool.Get(); node.Context = context; - node.Initialize(this, throwOnTimeout); + node.Initialize(this, flags); EnqueueNode(node); return node; } @@ -868,7 +876,7 @@ protected ValueTask TryAcquireAsync(TContext context, TimeSpan timeout, Ca break; } - factory = EnqueueNode(context, throwOnTimeout: false); + factory = EnqueueNode(context, WaitNodeFlags.None); } task = factory.Invoke(timeout, token); @@ -932,7 +940,7 @@ protected ValueTask AcquireAsync(TContext context, TimeSpan timeout, Cancellatio break; } - factory = EnqueueNode(context, throwOnTimeout: true); + factory = EnqueueNode(context, WaitNodeFlags.ThrowOnTimeout); } task = factory.Invoke(timeout, token); diff --git a/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj b/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj index 24b5d64d3..d72e5eeaa 100644 --- a/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj +++ b/src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj @@ -8,7 +8,7 @@ true true nullablePublicOnly - 5.5.1 + 5.6.0 .NET Foundation and Contributors .NEXT Family of Libraries diff --git a/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj b/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj index f90f49d3c..0eaf8bcf6 100644 --- a/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj +++ b/src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj @@ -8,7 +8,7 @@ enable true nullablePublicOnly - 5.5.1 + 5.6.0 .NET Foundation and Contributors .NEXT Family of Libraries diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/BinaryLogEntry.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/BinaryLogEntry.cs index 0070cdfb6..dbfddd131 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/BinaryLogEntry.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/BinaryLogEntry.cs @@ -12,7 +12,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft; /// /// Binary-formattable type. [StructLayout(LayoutKind.Auto)] -public readonly struct BinaryLogEntry() : IBinaryLogEntry +public readonly struct BinaryLogEntry() : IInputLogEntry, ISupplier, MemoryOwner> where T : notnull, IBinaryFormattable { /// @@ -44,8 +44,15 @@ public readonly struct BinaryLogEntry() : IBinaryLogEntry /// long? IDataTransferObject.Length => T.Size; + /// + public object? Context + { + get; + init; + } + /// - MemoryOwner IBinaryLogEntry.ToBuffer(MemoryAllocator allocator) + MemoryOwner ISupplier, MemoryOwner>.Invoke(MemoryAllocator allocator) => IBinaryFormattable.Format(Content, allocator); /// @@ -57,7 +64,7 @@ ValueTask IDataTransferObject.WriteToAsync(TWriter writer, Cancellation /// Represents default implementation of . /// [StructLayout(LayoutKind.Auto)] -public readonly struct BinaryLogEntry() : IBinaryLogEntry +public readonly struct BinaryLogEntry() : IInputLogEntry, ISupplier, MemoryOwner> { private readonly ReadOnlyMemory content; @@ -98,6 +105,13 @@ public int? CommandId /// bool IDataTransferObject.IsReusable => true; + /// + public object? Context + { + get; + init; + } + /// bool IDataTransferObject.TryGetMemory(out ReadOnlyMemory memory) { @@ -106,7 +120,7 @@ bool IDataTransferObject.TryGetMemory(out ReadOnlyMemory memory) } /// - MemoryOwner IBinaryLogEntry.ToBuffer(MemoryAllocator allocator) + MemoryOwner ISupplier, MemoryOwner>.Invoke(MemoryAllocator allocator) => content.Span.Copy(allocator); /// diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.CommandHandler.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.CommandHandler.cs index 57b730335..8a95a08e6 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.CommandHandler.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.CommandHandler.cs @@ -9,18 +9,29 @@ public partial class CommandInterpreter { private abstract class CommandHandler { - internal abstract ValueTask InterpretAsync(TReader reader, CancellationToken token) + internal abstract ValueTask InterpretAsync(TReader reader, object? context, CancellationToken token) where TReader : notnull, IAsyncBinaryReader; } - private sealed class CommandHandler(Func handler) : CommandHandler + private sealed class CommandHandler(Func handler) : CommandHandler where TCommand : notnull, ISerializable { + public CommandHandler(Func handler) + : this(handler.Invoke) + { + } + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] - internal override async ValueTask InterpretAsync(TReader reader, CancellationToken token) + internal override async ValueTask InterpretAsync(TReader reader, object? context, CancellationToken token) { var command = await TCommand.ReadFromAsync(reader, token).ConfigureAwait(false); - await handler(command, token).ConfigureAwait(false); + await handler(command, context, token).ConfigureAwait(false); } } +} + +file static class CommandHandlerExtensions +{ + public static ValueTask Invoke(this Func handler, TCommand command, object? context, CancellationToken token) + => handler.Invoke(command, token); } \ No newline at end of file diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.Registry.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.Registry.cs index 282778ba5..430410777 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.Registry.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.Registry.cs @@ -25,9 +25,15 @@ internal InterpretingTransformation(int id, IHandlerRegistry registry) this.id = id; } + internal object? Context + { + private get; + init; + } + async ValueTask IDataTransferObject.ITransformation.TransformAsync(TReader reader, CancellationToken token) { - await handler.InterpretAsync(reader, token).ConfigureAwait(false); + await handler.InterpretAsync(reader, Context, token).ConfigureAwait(false); return id; } } diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.cs index b41cd55b7..e2d5945f9 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandInterpreter.cs @@ -4,6 +4,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft.Commands; +using DotNext.Runtime; using IO.Log; using Runtime.Serialization; using static Reflection.MethodExtensions; @@ -65,14 +66,23 @@ protected CommandInterpreter() if (handlerAttr is not null && method.ReturnType == typeof(ValueTask)) { var parameters = method.GetParameterTypes(); - if (parameters.GetLength() is not 2 || !parameters[0].IsValueType || parameters[1] != typeof(CancellationToken)) - continue; - var commandType = parameters[0]; - if (!identifiers.TryGetValue(commandType, out var commandId)) + Delegate interpreter; + switch (parameters.GetLength()) + { + case 2 when parameters[0].IsValueType && parameters[1] == typeof(CancellationToken): + interpreter = Delegate.CreateDelegate(typeof(Func<,,>).MakeGenericType(parameters[0], parameters[1], typeof(ValueTask)), method.IsStatic ? null : this, method); + break; + case 3 when parameters[0].IsValueType && parameters[1] == typeof(object) && parameters[2] == typeof(CancellationToken): + interpreter = Delegate.CreateDelegate(typeof(Func<,,,>).MakeGenericType(parameters[0], parameters[1], parameters[2], typeof(ValueTask)), method.IsStatic ? null : this, method); + break; + default: + continue; + } + + if (!identifiers.TryGetValue(parameters[0], out var commandId)) continue; - var interpreter = Delegate.CreateDelegate(typeof(Func<,,>).MakeGenericType(commandType, typeof(CancellationToken), typeof(ValueTask)), method.IsStatic ? null : this, method); - interpreters.Add(commandId, Cast(Activator.CreateInstance(typeof(CommandHandler<>).MakeGenericType(commandType), interpreter))); + interpreters.Add(commandId, Cast(Activator.CreateInstance(typeof(CommandHandler<>).MakeGenericType(parameters[0]), interpreter))); if (handlerAttr.IsSnapshotHandler) snapshotCommandId = commandId; @@ -129,4 +139,24 @@ public ValueTask InterpretAsync(TEntry entry, CancellationToken tok => TryGetCommandId(ref entry, out var id) ? entry.TransformAsync(new InterpretingTransformation(id, interpreters), token) : ValueTask.FromException(new ArgumentException(ExceptionMessages.MissingCommandId, nameof(entry))); + + /// + /// Interprets log entry asynchronously. + /// + /// + /// Typically this method is called by the custom implementation of + /// method. + /// + /// The log entry to be interpreted. + /// The context to be passed to the handler. + /// The token that can be used to cancel the interpretation. + /// The type of the log entry to be interpreted. + /// The ID of the interpreted log entry. + /// The command handler was not registered for the command represented by . + /// The operation has been canceled. + public ValueTask InterpretAsync(TEntry entry, object? context, CancellationToken token = default) + where TEntry : struct, IRaftLogEntry + => TryGetCommandId(ref entry, out var id) ? + entry.TransformAsync(new InterpretingTransformation(id, interpreters) { Context = context }, token) : + ValueTask.FromException(new ArgumentException(ExceptionMessages.MissingCommandId, nameof(entry))); } \ No newline at end of file diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandRegistry.Builder.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandRegistry.Builder.cs index 237dbc547..e02e94d78 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandRegistry.Builder.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Commands/CommandRegistry.Builder.cs @@ -14,6 +14,17 @@ public sealed class Builder : ISupplier, IResettable private readonly Dictionary identifiers = new(); private int? snapshotCommandId; + private Builder Add(int commandId, CommandHandler handler, bool snapshotHandler) + where TCommand : notnull, ISerializable + { + identifiers.Add(typeof(TCommand), commandId); + interpreters.Add(commandId, handler); + if (snapshotHandler) + snapshotCommandId = commandId; + + return this; + } + /// /// Registers command handler. /// @@ -32,11 +43,28 @@ public Builder Add(int commandId, Func(handler)); - if (snapshotHandler) - snapshotCommandId = commandId; - return this; + return Add(commandId, new CommandHandler(handler), snapshotHandler); + } + + /// + /// Registers command handler. + /// + /// The identifier of the command. + /// The command handler. + /// + /// to register a handler for snapshot log entry; + /// to register a handler for regular log entry. + /// + /// The type of the command supported by the handler. + /// This builder. + /// is . + /// Type is not annotated with attribute. + public Builder Add(int commandId, Func handler, bool snapshotHandler = false) + where TCommand : notnull, ISerializable + { + ArgumentNullException.ThrowIfNull(handler); + + return Add(commandId, new CommandHandler(handler), snapshotHandler); } /// diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/DiskBasedStateMachine.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/DiskBasedStateMachine.cs index a02be4e62..a6d4114b5 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/DiskBasedStateMachine.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/DiskBasedStateMachine.cs @@ -71,13 +71,21 @@ protected DiskBasedStateMachine(string path, int recordsPerPartition, Options? c Volatile.Write(ref lastTerm, entry.Term); // Remove log entry from the cache according to eviction policy + var lastEntryInPartition = startIndex == commitIndex || startIndex == partition.LastIndex; if (!entry.IsPersisted) { await partition.PersistCachedEntryAsync(startIndex, snapshotLength.HasValue).ConfigureAwait(false); // Flush partition if we are finished or at the last entry in it - if (startIndex == commitIndex || startIndex == partition.LastIndex) + if (lastEntryInPartition) + { await partition.FlushAsync(token).ConfigureAwait(false); + partition.ClearContext(startIndex); + } + } + else if (lastEntryInPartition) + { + partition.ClearContext(startIndex); } if (snapshotLength.HasValue) diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/EmptyLogEntry.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/EmptyLogEntry.cs index 31191c86d..a80dc7276 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/EmptyLogEntry.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/EmptyLogEntry.cs @@ -2,6 +2,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft; +using DotNext.Buffers; using IO; using IO.Log; @@ -9,7 +10,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft; /// Represents No-OP entry. /// [StructLayout(LayoutKind.Auto)] -public readonly struct EmptyLogEntry() : IRaftLogEntry +public readonly struct EmptyLogEntry() : IRaftLogEntry, ISupplier, MemoryOwner> { /// int? IRaftLogEntry.CommandId => null; @@ -47,4 +48,8 @@ ValueTask IDataTransferObject.WriteToAsync(TWriter writer, Cancellation /// ValueTask IDataTransferObject.TransformAsync(TTransformation transformation, CancellationToken token) => IDataTransferObject.Empty.TransformAsync(transformation, token); + + /// + MemoryOwner ISupplier, MemoryOwner>.Invoke(MemoryAllocator allocator) + => default; } \ No newline at end of file diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IBinaryLogEntry.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IBinaryLogEntry.cs deleted file mode 100644 index d9a647518..000000000 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IBinaryLogEntry.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace DotNext.Net.Cluster.Consensus.Raft; - -using Buffers; - -internal interface IBinaryLogEntry : IRaftLogEntry -{ - MemoryOwner ToBuffer(MemoryAllocator allocator); -} \ No newline at end of file diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IInputLogEntry.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IInputLogEntry.cs new file mode 100644 index 000000000..4e904f3ac --- /dev/null +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IInputLogEntry.cs @@ -0,0 +1,17 @@ +namespace DotNext.Net.Cluster.Consensus.Raft; + +/// +/// Represents a custom log entry that can be passed to the log. +/// +public interface IInputLogEntry : IRaftLogEntry +{ + /// + /// Gets or sets runtime context associated with the log entry. + /// + /// + /// The value passes through + /// to or . + /// It can be retrieved by using property. + /// + object? Context { get; init; } +} \ No newline at end of file diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/JsonLogEntry.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/JsonLogEntry.cs index 860a87e2c..454af1bc7 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/JsonLogEntry.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/JsonLogEntry.cs @@ -10,7 +10,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft; /// /// JSON-serializable type. [StructLayout(LayoutKind.Auto)] -public readonly struct JsonLogEntry() : IRaftLogEntry +public readonly struct JsonLogEntry() : IInputLogEntry where T : notnull, IJsonSerializable { /// @@ -37,6 +37,13 @@ public readonly struct JsonLogEntry() : IRaftLogEntry /// int? IRaftLogEntry.CommandId => null; + /// + public object? Context + { + get; + init; + } + /// ValueTask IDataTransferObject.WriteToAsync(TWriter writer, CancellationToken token) => JsonSerializable.SerializeAsync(writer, Content, token); diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs index 1eed8b47e..32c0d1639 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs @@ -666,12 +666,20 @@ long ComputeUpperBoundIndex(long count) /// /// Applies the command represented by the log entry to the underlying database engine. /// + /// + /// can be used to pass custom data + /// through WAL processing pipeline. The value of this property is not persistent. + /// The data can be passed as a part of log entry which is used as + /// the argument of method. + /// The passed log entry type must implement interface. + /// /// The entry to be applied to the state machine. /// The task representing asynchronous execution of this method. /// protected abstract ValueTask ApplyAsync(LogEntry entry); - private ValueTask ApplyCoreAsync(LogEntry entry) => entry.IsEmpty ? new() : ApplyAsync(entry); // skip empty log entry + private ValueTask ApplyCoreAsync(LogEntry entry) + => entry.IsEmpty ? ValueTask.CompletedTask : ApplyAsync(entry); // skip empty log entry [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] private async ValueTask ApplyAsync(int sessionId, long startIndex, CancellationToken token) @@ -686,13 +694,21 @@ private async ValueTask ApplyAsync(int sessionId, long startIndex, CancellationT Volatile.Write(ref lastTerm, entry.Term); // Remove log entry from the cache according to eviction policy + var lastEntryInPartition = startIndex == commitIndex || startIndex == partition.LastIndex; if (!entry.IsPersisted) { await partition.PersistCachedEntryAsync(startIndex, evictOnCommit).ConfigureAwait(false); // Flush partition if we are finished or at the last entry in it - if (startIndex == commitIndex || startIndex == partition.LastIndex) + if (lastEntryInPartition) + { await partition.FlushAsync(token).ConfigureAwait(false); + partition.ClearContext(startIndex); + } + } + else if (lastEntryInPartition) + { + partition.ClearContext(startIndex); } } else diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Cache.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Cache.cs index 6f6124204..992963b03 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Cache.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Cache.cs @@ -9,12 +9,17 @@ namespace DotNext.Net.Cluster.Consensus.Raft; public partial class PersistentState { [StructLayout(LayoutKind.Auto)] - internal struct CacheRecord() : IDisposable + internal struct CacheRecord : IDisposable { internal MemoryOwner Content; - internal CachedLogEntryPersistenceMode PersistenceMode = CachedLogEntryPersistenceMode.CopyToBuffer; + internal CachedLogEntryPersistenceMode PersistenceMode; + internal object? Context; - public void Dispose() => Content.Dispose(); + public void Dispose() + { + Context = null; + Content.Dispose(); + } } internal enum CachedLogEntryPersistenceMode : byte @@ -28,10 +33,16 @@ internal enum CachedLogEntryPersistenceMode : byte /// Represents buffered Raft log entry. /// [StructLayout(LayoutKind.Auto)] - internal readonly struct CachedLogEntry : IRaftLogEntry + internal readonly struct CachedLogEntry : IInputLogEntry { private readonly CacheRecord record; + public object? Context + { + get => record.Context; + init => record.Context = value; + } + internal CachedLogEntryPersistenceMode PersistenceMode { get => record.PersistenceMode; @@ -46,7 +57,7 @@ required internal MemoryOwner Content required public long Term { get; init; } - public int? CommandId { get; init; } + required public int? CommandId { get; init; } internal long Length => record.Content.Length; diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.LogEntry.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.LogEntry.cs index 5b54e7c3b..153ebf583 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.LogEntry.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.LogEntry.cs @@ -18,7 +18,7 @@ public partial class PersistentState /// Use to decode the log entry. /// [StructLayout(LayoutKind.Auto)] - protected internal readonly struct LogEntry : IRaftLogEntry + protected internal readonly struct LogEntry : IInputLogEntry { // null (if empty), FileReader, IAsyncBinaryReader, or byte[], or MemoryManager private readonly object? content; @@ -50,6 +50,15 @@ internal bool IsPersisted init; } + /// + /// Gets or sets context associated with this log entry. + /// + public object? Context + { + get; + init; + } + internal IAsyncBinaryReader? ContentReader { init => content = metadata.Length > 0L ? value : IAsyncBinaryReader.Empty; diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs index 3d210ec7a..bd76414a7 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs @@ -15,10 +15,11 @@ public partial class PersistentState private protected abstract class Partition : ConcurrentStorageAccess { internal const int MaxRecordsPerPartition = int.MaxValue / LogEntryMetadata.Size; - protected static readonly CacheRecord EmptyRecord = new(); + protected static readonly CacheRecord EmptyRecord = new() { PersistenceMode = CachedLogEntryPersistenceMode.CopyToBuffer }; internal readonly long FirstIndex, PartitionNumber, LastIndex; private Partition? previous, next; + private object?[]? context; protected MemoryOwner entryCache; protected int runningIndex; @@ -95,6 +96,27 @@ internal bool Contains(long recordIndex) internal abstract void Initialize(); + internal void ClearContext(long absoluteIndex) + { + Debug.Assert(absoluteIndex >= FirstIndex); + Debug.Assert(absoluteIndex <= LastIndex); + + if (context is not null) + { + var relativeIndex = ToRelativeIndex(absoluteIndex); + + if (relativeIndex == context.Length - 1) + { + Array.Clear(context); + context = null; + } + else + { + Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(context), relativeIndex) = null; + } + } + } + internal LogEntry Read(int sessionId, long absoluteIndex, bool metadataOnly = false) { Debug.Assert(absoluteIndex >= FirstIndex && absoluteIndex <= LastIndex, $"Invalid index value {absoluteIndex}, offset {FirstIndex}"); @@ -116,6 +138,7 @@ internal LogEntry Read(int sessionId, long absoluteIndex, bool metadataOnly = fa { ContentReader = GetSessionReader(sessionId), IsPersisted = true, + Context = GetContext(relativeIndex), }; } @@ -124,7 +147,17 @@ internal LogEntry Read(int sessionId, long absoluteIndex, bool metadataOnly = fa { ContentBuffer = cachedContent.Content.Memory, IsPersisted = cachedContent.PersistenceMode is not CachedLogEntryPersistenceMode.None, + Context = cachedContent.Context, }; + + object? GetContext(int index) + { + Debug.Assert(index <= ToRelativeIndex(LastIndex)); + + return context is not null + ? Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(context), index) + : null; + } } internal ValueTask PersistCachedEntryAsync(long absoluteIndex, bool removeFromMemory) @@ -227,6 +260,10 @@ internal ValueTask WriteAsync(TEntry entry, long absoluteIndex, Cancella goto exit; } } + else if (entry is IInputLogEntry && ((IInputLogEntry)entry).Context is { } context) + { + SetContext(relativeIndex, context); + } // invalidate cached log entry on write if (!entryCache.IsEmpty) @@ -234,6 +271,14 @@ internal ValueTask WriteAsync(TEntry entry, long absoluteIndex, Cancella exit: return PersistAsync(entry, relativeIndex, token); + + void SetContext(int relativeIndex, object context) + { + Debug.Assert(context is not null); + + this.context ??= new object?[ToRelativeIndex(LastIndex) + 1]; + Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(this.context), relativeIndex) = context; + } } protected override void Dispose(bool disposing) @@ -241,6 +286,13 @@ protected override void Dispose(bool disposing) if (disposing) { previous = next = null; + + if (context is not null) + { + Array.Clear(context); + context = null; + } + entryCache.ReleaseAll(); } diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs index d35bbc03b..2f03a5ceb 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs @@ -6,6 +6,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft; using Collections.Specialized; +using DotNext.Buffers; using IO.Log; using static IO.DataTransferObject; using AsyncTrigger = Threading.AsyncTrigger; @@ -343,7 +344,7 @@ private async ValueTask ReadBufferedAsync(ILogEntryConsumer(bufferingConsumer, session, startIndex, endIndex ?? state.LastIndex, token).ConfigureAwait(false); + (bufferedEntries, snapshotIndex) = await UnsafeReadAsync(bufferingConsumer, session, startIndex, endIndex ?? state.LastIndex, token).ConfigureAwait(false); } finally { @@ -676,7 +677,14 @@ private async ValueTask AppendUncachedAsync(TEntry entry, Cancella private async ValueTask AppendCachedAsync(TEntry entry, CancellationToken token) where TEntry : notnull, IRaftLogEntry - => await AppendCachedAsync(new CachedLogEntry { Content = await entry.ToMemoryAsync(bufferManager.BufferAllocator, token).ConfigureAwait(false), Term = entry.Term, Timestamp = entry.Timestamp, CommandId = entry.CommandId }, token).ConfigureAwait(false); + => await AppendCachedAsync(new CachedLogEntry + { + Content = await entry.ToMemoryAsync(bufferManager.BufferAllocator, token).ConfigureAwait(false), + Term = entry.Term, + Timestamp = entry.Timestamp, + CommandId = entry.CommandId, + Context = entry is IInputLogEntry ? ((IInputLogEntry)entry).Context : null, + }, token).ConfigureAwait(false); private async ValueTask AppendCachedAsync(CachedLogEntry cachedEntry, CancellationToken token) { @@ -744,8 +752,15 @@ public ValueTask AppendAsync(TEntry entry, bool addToCache, Cancel } else if (bufferManager.IsCachingEnabled && addToCache) { - result = entry is IBinaryLogEntry - ? AppendCachedAsync(new CachedLogEntry { Content = ((IBinaryLogEntry)entry).ToBuffer(bufferManager.BufferAllocator), Term = entry.Term, Timestamp = entry.Timestamp, CommandId = entry.CommandId }, token) + result = entry is ISupplier, MemoryOwner> + ? AppendCachedAsync(new CachedLogEntry + { + Content = ((ISupplier, MemoryOwner>)entry).Invoke(bufferManager.BufferAllocator), + Term = entry.Term, + Timestamp = entry.Timestamp, + CommandId = entry.CommandId, + Context = entry is IInputLogEntry ? ((IInputLogEntry)entry).Context : null, + }, token) : AppendCachedAsync(entry, token); } else