diff --git a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs index 815c4d2fc..fc9e3f636 100644 --- a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs +++ b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs @@ -41,6 +41,14 @@ protected override ValueTask ApplyAsync(LogEntry entry) False(entry.IsEmpty); True(entry.GetReader().TryGetRemainingBytesCount(out var length)); NotEqual(0L, length); + + switch (entry.Context) + { + case int value: + Equal(56, value); + break; + } + return ValueTask.CompletedTask; } @@ -96,9 +104,9 @@ public static async Task StateManipulations() { Equal(0, state.Term); Equal(1, await state.IncrementTermAsync(default)); - True(state.IsVotedFor(default(ClusterMemberId))); + True(state.IsVotedFor(default)); await state.UpdateVotedForAsync(member); - False(state.IsVotedFor(default(ClusterMemberId))); + False(state.IsVotedFor(default)); True(state.IsVotedFor(member)); } finally @@ -111,7 +119,7 @@ public static async Task StateManipulations() try { Equal(1, state.Term); - False(state.IsVotedFor(default(ClusterMemberId))); + False(state.IsVotedFor(default)); True(state.IsVotedFor(member)); } finally @@ -149,7 +157,7 @@ public static async Task EmptyLogEntry() [InlineData(1024, false, 65)] public static async Task QueryAppendEntries(long partitionSize, bool caching, int concurrentReads) { - var entry1 = new TestLogEntry("SET X = 0") { Term = 42L }; + var entry1 = new TestLogEntry("SET X = 0") { Term = 42L, Context = 56 }; var entry2 = new TestLogEntry("SET Y = 1") { Term = 43L }; var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName()); Func, long?, CancellationToken, ValueTask> checker; @@ -175,6 +183,7 @@ public static async Task QueryAppendEntries(long partitionSize, bool caching, in Equal(0L, entries.First().Term); // element 0 Equal(42L, entries.Skip(1).First().Term); // element 1 Equal(entry1.Content, await entries[1].ToStringAsync(Encoding.UTF8)); + Equal(entry1.Context, IsAssignableFrom(entries[1]).Context); return Missing.Value; }; @@ -460,7 +469,7 @@ public static async Task PartitionOverflow(bool useCaching) [InlineData(false)] public static async Task Commit(bool useCaching) { - var entry1 = new TestLogEntry("SET X = 0") { Term = 42L }; + var entry1 = new TestLogEntry("SET X = 0") { Term = 42L, Context = 56 }; var entry2 = new TestLogEntry("SET Y = 1") { Term = 43L }; var entry3 = new TestLogEntry("SET Z = 2") { Term = 44L }; var entry4 = new TestLogEntry("SET U = 3") { Term = 45L }; diff --git a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/TestLogEntry.cs b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/TestLogEntry.cs index bace116fb..7ab7ac40e 100644 --- a/src/DotNext.Tests/Net/Cluster/Consensus/Raft/TestLogEntry.cs +++ b/src/DotNext.Tests/Net/Cluster/Consensus/Raft/TestLogEntry.cs @@ -6,7 +6,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft; using TextMessage = Messaging.TextMessage; [ExcludeFromCodeCoverage] -internal sealed class TestLogEntry : TextMessage, IRaftLogEntry +internal sealed class TestLogEntry : TextMessage, IInputLogEntry { public TestLogEntry(string command) : base(command, "Entry") @@ -19,4 +19,10 @@ public TestLogEntry(string command) public long Term { get; set; } bool ILogEntry.IsSnapshot => false; + + public object Context + { + get; + init; + } } \ No newline at end of file diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/BinaryLogEntry.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/BinaryLogEntry.cs index a61ce5afa..dbfddd131 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/BinaryLogEntry.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/BinaryLogEntry.cs @@ -12,7 +12,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft; /// /// Binary-formattable type. [StructLayout(LayoutKind.Auto)] -public readonly struct BinaryLogEntry() : IRaftLogEntry, ISupplier, MemoryOwner> +public readonly struct BinaryLogEntry() : IInputLogEntry, ISupplier, MemoryOwner> where T : notnull, IBinaryFormattable { /// @@ -44,6 +44,13 @@ public readonly struct BinaryLogEntry() : IRaftLogEntry, ISupplier long? IDataTransferObject.Length => T.Size; + /// + public object? Context + { + get; + init; + } + /// MemoryOwner ISupplier, MemoryOwner>.Invoke(MemoryAllocator allocator) => IBinaryFormattable.Format(Content, allocator); @@ -57,7 +64,7 @@ ValueTask IDataTransferObject.WriteToAsync(TWriter writer, Cancellation /// Represents default implementation of . /// [StructLayout(LayoutKind.Auto)] -public readonly struct BinaryLogEntry() : IRaftLogEntry, ISupplier, MemoryOwner> +public readonly struct BinaryLogEntry() : IInputLogEntry, ISupplier, MemoryOwner> { private readonly ReadOnlyMemory content; @@ -98,6 +105,13 @@ public int? CommandId /// bool IDataTransferObject.IsReusable => true; + /// + public object? Context + { + get; + init; + } + /// bool IDataTransferObject.TryGetMemory(out ReadOnlyMemory memory) { diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/DiskBasedStateMachine.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/DiskBasedStateMachine.cs index a02be4e62..a6d4114b5 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/DiskBasedStateMachine.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/DiskBasedStateMachine.cs @@ -71,13 +71,21 @@ protected DiskBasedStateMachine(string path, int recordsPerPartition, Options? c Volatile.Write(ref lastTerm, entry.Term); // Remove log entry from the cache according to eviction policy + var lastEntryInPartition = startIndex == commitIndex || startIndex == partition.LastIndex; if (!entry.IsPersisted) { await partition.PersistCachedEntryAsync(startIndex, snapshotLength.HasValue).ConfigureAwait(false); // Flush partition if we are finished or at the last entry in it - if (startIndex == commitIndex || startIndex == partition.LastIndex) + if (lastEntryInPartition) + { await partition.FlushAsync(token).ConfigureAwait(false); + partition.ClearContext(startIndex); + } + } + else if (lastEntryInPartition) + { + partition.ClearContext(startIndex); } if (snapshotLength.HasValue) diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/EmptyLogEntry.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/EmptyLogEntry.cs index 31191c86d..a80dc7276 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/EmptyLogEntry.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/EmptyLogEntry.cs @@ -2,6 +2,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft; +using DotNext.Buffers; using IO; using IO.Log; @@ -9,7 +10,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft; /// Represents No-OP entry. /// [StructLayout(LayoutKind.Auto)] -public readonly struct EmptyLogEntry() : IRaftLogEntry +public readonly struct EmptyLogEntry() : IRaftLogEntry, ISupplier, MemoryOwner> { /// int? IRaftLogEntry.CommandId => null; @@ -47,4 +48,8 @@ ValueTask IDataTransferObject.WriteToAsync(TWriter writer, Cancellation /// ValueTask IDataTransferObject.TransformAsync(TTransformation transformation, CancellationToken token) => IDataTransferObject.Empty.TransformAsync(transformation, token); + + /// + MemoryOwner ISupplier, MemoryOwner>.Invoke(MemoryAllocator allocator) + => default; } \ No newline at end of file diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IInputLogEntry.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IInputLogEntry.cs new file mode 100644 index 000000000..135c5b647 --- /dev/null +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IInputLogEntry.cs @@ -0,0 +1,12 @@ +namespace DotNext.Net.Cluster.Consensus.Raft; + +/// +/// Represents a custom log entry that can be passed to the log. +/// +public interface IInputLogEntry : IRaftLogEntry +{ + /// + /// Gets or sets runtime context associated with the log entry. + /// + object? Context { get; init; } +} \ No newline at end of file diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/JsonLogEntry.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/JsonLogEntry.cs index 860a87e2c..454af1bc7 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/JsonLogEntry.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/JsonLogEntry.cs @@ -10,7 +10,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft; /// /// JSON-serializable type. [StructLayout(LayoutKind.Auto)] -public readonly struct JsonLogEntry() : IRaftLogEntry +public readonly struct JsonLogEntry() : IInputLogEntry where T : notnull, IJsonSerializable { /// @@ -37,6 +37,13 @@ public readonly struct JsonLogEntry() : IRaftLogEntry /// int? IRaftLogEntry.CommandId => null; + /// + public object? Context + { + get; + init; + } + /// ValueTask IDataTransferObject.WriteToAsync(TWriter writer, CancellationToken token) => JsonSerializable.SerializeAsync(writer, Content, token); diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs index 1eed8b47e..c79393014 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/MemoryBasedStateMachine.cs @@ -671,7 +671,8 @@ long ComputeUpperBoundIndex(long count) /// protected abstract ValueTask ApplyAsync(LogEntry entry); - private ValueTask ApplyCoreAsync(LogEntry entry) => entry.IsEmpty ? new() : ApplyAsync(entry); // skip empty log entry + private ValueTask ApplyCoreAsync(LogEntry entry) + => entry.IsEmpty ? new() : ApplyAsync(entry); // skip empty log entry [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] private async ValueTask ApplyAsync(int sessionId, long startIndex, CancellationToken token) @@ -686,13 +687,21 @@ private async ValueTask ApplyAsync(int sessionId, long startIndex, CancellationT Volatile.Write(ref lastTerm, entry.Term); // Remove log entry from the cache according to eviction policy + var lastEntryInPartition = startIndex == commitIndex || startIndex == partition.LastIndex; if (!entry.IsPersisted) { await partition.PersistCachedEntryAsync(startIndex, evictOnCommit).ConfigureAwait(false); // Flush partition if we are finished or at the last entry in it - if (startIndex == commitIndex || startIndex == partition.LastIndex) + if (lastEntryInPartition) + { await partition.FlushAsync(token).ConfigureAwait(false); + partition.ClearContext(startIndex); + } + } + else if (lastEntryInPartition) + { + partition.ClearContext(startIndex); } } else diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Cache.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Cache.cs index 6f6124204..992963b03 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Cache.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Cache.cs @@ -9,12 +9,17 @@ namespace DotNext.Net.Cluster.Consensus.Raft; public partial class PersistentState { [StructLayout(LayoutKind.Auto)] - internal struct CacheRecord() : IDisposable + internal struct CacheRecord : IDisposable { internal MemoryOwner Content; - internal CachedLogEntryPersistenceMode PersistenceMode = CachedLogEntryPersistenceMode.CopyToBuffer; + internal CachedLogEntryPersistenceMode PersistenceMode; + internal object? Context; - public void Dispose() => Content.Dispose(); + public void Dispose() + { + Context = null; + Content.Dispose(); + } } internal enum CachedLogEntryPersistenceMode : byte @@ -28,10 +33,16 @@ internal enum CachedLogEntryPersistenceMode : byte /// Represents buffered Raft log entry. /// [StructLayout(LayoutKind.Auto)] - internal readonly struct CachedLogEntry : IRaftLogEntry + internal readonly struct CachedLogEntry : IInputLogEntry { private readonly CacheRecord record; + public object? Context + { + get => record.Context; + init => record.Context = value; + } + internal CachedLogEntryPersistenceMode PersistenceMode { get => record.PersistenceMode; @@ -46,7 +57,7 @@ required internal MemoryOwner Content required public long Term { get; init; } - public int? CommandId { get; init; } + required public int? CommandId { get; init; } internal long Length => record.Content.Length; diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.LogEntry.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.LogEntry.cs index 5b54e7c3b..153ebf583 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.LogEntry.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.LogEntry.cs @@ -18,7 +18,7 @@ public partial class PersistentState /// Use to decode the log entry. /// [StructLayout(LayoutKind.Auto)] - protected internal readonly struct LogEntry : IRaftLogEntry + protected internal readonly struct LogEntry : IInputLogEntry { // null (if empty), FileReader, IAsyncBinaryReader, or byte[], or MemoryManager private readonly object? content; @@ -50,6 +50,15 @@ internal bool IsPersisted init; } + /// + /// Gets or sets context associated with this log entry. + /// + public object? Context + { + get; + init; + } + internal IAsyncBinaryReader? ContentReader { init => content = metadata.Length > 0L ? value : IAsyncBinaryReader.Empty; diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs index 3d210ec7a..bd76414a7 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs @@ -15,10 +15,11 @@ public partial class PersistentState private protected abstract class Partition : ConcurrentStorageAccess { internal const int MaxRecordsPerPartition = int.MaxValue / LogEntryMetadata.Size; - protected static readonly CacheRecord EmptyRecord = new(); + protected static readonly CacheRecord EmptyRecord = new() { PersistenceMode = CachedLogEntryPersistenceMode.CopyToBuffer }; internal readonly long FirstIndex, PartitionNumber, LastIndex; private Partition? previous, next; + private object?[]? context; protected MemoryOwner entryCache; protected int runningIndex; @@ -95,6 +96,27 @@ internal bool Contains(long recordIndex) internal abstract void Initialize(); + internal void ClearContext(long absoluteIndex) + { + Debug.Assert(absoluteIndex >= FirstIndex); + Debug.Assert(absoluteIndex <= LastIndex); + + if (context is not null) + { + var relativeIndex = ToRelativeIndex(absoluteIndex); + + if (relativeIndex == context.Length - 1) + { + Array.Clear(context); + context = null; + } + else + { + Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(context), relativeIndex) = null; + } + } + } + internal LogEntry Read(int sessionId, long absoluteIndex, bool metadataOnly = false) { Debug.Assert(absoluteIndex >= FirstIndex && absoluteIndex <= LastIndex, $"Invalid index value {absoluteIndex}, offset {FirstIndex}"); @@ -116,6 +138,7 @@ internal LogEntry Read(int sessionId, long absoluteIndex, bool metadataOnly = fa { ContentReader = GetSessionReader(sessionId), IsPersisted = true, + Context = GetContext(relativeIndex), }; } @@ -124,7 +147,17 @@ internal LogEntry Read(int sessionId, long absoluteIndex, bool metadataOnly = fa { ContentBuffer = cachedContent.Content.Memory, IsPersisted = cachedContent.PersistenceMode is not CachedLogEntryPersistenceMode.None, + Context = cachedContent.Context, }; + + object? GetContext(int index) + { + Debug.Assert(index <= ToRelativeIndex(LastIndex)); + + return context is not null + ? Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(context), index) + : null; + } } internal ValueTask PersistCachedEntryAsync(long absoluteIndex, bool removeFromMemory) @@ -227,6 +260,10 @@ internal ValueTask WriteAsync(TEntry entry, long absoluteIndex, Cancella goto exit; } } + else if (entry is IInputLogEntry && ((IInputLogEntry)entry).Context is { } context) + { + SetContext(relativeIndex, context); + } // invalidate cached log entry on write if (!entryCache.IsEmpty) @@ -234,6 +271,14 @@ internal ValueTask WriteAsync(TEntry entry, long absoluteIndex, Cancella exit: return PersistAsync(entry, relativeIndex, token); + + void SetContext(int relativeIndex, object context) + { + Debug.Assert(context is not null); + + this.context ??= new object?[ToRelativeIndex(LastIndex) + 1]; + Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(this.context), relativeIndex) = context; + } } protected override void Dispose(bool disposing) @@ -241,6 +286,13 @@ protected override void Dispose(bool disposing) if (disposing) { previous = next = null; + + if (context is not null) + { + Array.Clear(context); + context = null; + } + entryCache.ReleaseAll(); } diff --git a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs index 89f6178b5..2f03a5ceb 100644 --- a/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs +++ b/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs @@ -344,7 +344,7 @@ private async ValueTask ReadBufferedAsync(ILogEntryConsumer(bufferingConsumer, session, startIndex, endIndex ?? state.LastIndex, token).ConfigureAwait(false); + (bufferedEntries, snapshotIndex) = await UnsafeReadAsync(bufferingConsumer, session, startIndex, endIndex ?? state.LastIndex, token).ConfigureAwait(false); } finally { @@ -677,7 +677,14 @@ private async ValueTask AppendUncachedAsync(TEntry entry, Cancella private async ValueTask AppendCachedAsync(TEntry entry, CancellationToken token) where TEntry : notnull, IRaftLogEntry - => await AppendCachedAsync(new CachedLogEntry { Content = await entry.ToMemoryAsync(bufferManager.BufferAllocator, token).ConfigureAwait(false), Term = entry.Term, Timestamp = entry.Timestamp, CommandId = entry.CommandId }, token).ConfigureAwait(false); + => await AppendCachedAsync(new CachedLogEntry + { + Content = await entry.ToMemoryAsync(bufferManager.BufferAllocator, token).ConfigureAwait(false), + Term = entry.Term, + Timestamp = entry.Timestamp, + CommandId = entry.CommandId, + Context = entry is IInputLogEntry ? ((IInputLogEntry)entry).Context : null, + }, token).ConfigureAwait(false); private async ValueTask AppendCachedAsync(CachedLogEntry cachedEntry, CancellationToken token) { @@ -746,7 +753,14 @@ public ValueTask AppendAsync(TEntry entry, bool addToCache, Cancel else if (bufferManager.IsCachingEnabled && addToCache) { result = entry is ISupplier, MemoryOwner> - ? AppendCachedAsync(new CachedLogEntry { Content = ((ISupplier, MemoryOwner>)entry).Invoke(bufferManager.BufferAllocator), Term = entry.Term, Timestamp = entry.Timestamp, CommandId = entry.CommandId }, token) + ? AppendCachedAsync(new CachedLogEntry + { + Content = ((ISupplier, MemoryOwner>)entry).Invoke(bufferManager.BufferAllocator), + Term = entry.Term, + Timestamp = entry.Timestamp, + CommandId = entry.CommandId, + Context = entry is IInputLogEntry ? ((IInputLogEntry)entry).Context : null, + }, token) : AppendCachedAsync(entry, token); } else