Skip to content

Commit

Permalink
Added support of runtime context when processing log entries
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed May 27, 2024
1 parent 54c3379 commit 1c74d61
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ protected override ValueTask ApplyAsync(LogEntry entry)
False(entry.IsEmpty);
True(entry.GetReader().TryGetRemainingBytesCount(out var length));
NotEqual(0L, length);

switch (entry.Context)
{
case int value:
Equal(56, value);
break;
}

return ValueTask.CompletedTask;
}

Expand Down Expand Up @@ -96,9 +104,9 @@ public static async Task StateManipulations()
{
Equal(0, state.Term);
Equal(1, await state.IncrementTermAsync(default));
True(state.IsVotedFor(default(ClusterMemberId)));
True(state.IsVotedFor(default));
await state.UpdateVotedForAsync(member);
False(state.IsVotedFor(default(ClusterMemberId)));
False(state.IsVotedFor(default));
True(state.IsVotedFor(member));
}
finally
Expand All @@ -111,7 +119,7 @@ public static async Task StateManipulations()
try
{
Equal(1, state.Term);
False(state.IsVotedFor(default(ClusterMemberId)));
False(state.IsVotedFor(default));
True(state.IsVotedFor(member));
}
finally
Expand Down Expand Up @@ -149,7 +157,7 @@ public static async Task EmptyLogEntry()
[InlineData(1024, false, 65)]
public static async Task QueryAppendEntries(long partitionSize, bool caching, int concurrentReads)
{
var entry1 = new TestLogEntry("SET X = 0") { Term = 42L };
var entry1 = new TestLogEntry("SET X = 0") { Term = 42L, Context = 56 };
var entry2 = new TestLogEntry("SET Y = 1") { Term = 43L };
var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
Func<IReadOnlyList<IRaftLogEntry>, long?, CancellationToken, ValueTask<Missing>> checker;
Expand All @@ -175,6 +183,7 @@ public static async Task QueryAppendEntries(long partitionSize, bool caching, in
Equal(0L, entries.First().Term); // element 0
Equal(42L, entries.Skip(1).First().Term); // element 1
Equal(entry1.Content, await entries[1].ToStringAsync(Encoding.UTF8));
Equal(entry1.Context, IsAssignableFrom<IInputLogEntry>(entries[1]).Context);
return Missing.Value;
};

Expand Down Expand Up @@ -460,7 +469,7 @@ public static async Task PartitionOverflow(bool useCaching)
[InlineData(false)]
public static async Task Commit(bool useCaching)
{
var entry1 = new TestLogEntry("SET X = 0") { Term = 42L };
var entry1 = new TestLogEntry("SET X = 0") { Term = 42L, Context = 56 };
var entry2 = new TestLogEntry("SET Y = 1") { Term = 43L };
var entry3 = new TestLogEntry("SET Z = 2") { Term = 44L };
var entry4 = new TestLogEntry("SET U = 3") { Term = 45L };
Expand Down
8 changes: 7 additions & 1 deletion src/DotNext.Tests/Net/Cluster/Consensus/Raft/TestLogEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft;
using TextMessage = Messaging.TextMessage;

[ExcludeFromCodeCoverage]
internal sealed class TestLogEntry : TextMessage, IRaftLogEntry
internal sealed class TestLogEntry : TextMessage, IInputLogEntry
{
public TestLogEntry(string command)
: base(command, "Entry")
Expand All @@ -19,4 +19,10 @@ public TestLogEntry(string command)
public long Term { get; set; }

bool ILogEntry.IsSnapshot => false;

public object Context
{
get;
init;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft;
/// </summary>
/// <typeparam name="T">Binary-formattable type.</typeparam>
[StructLayout(LayoutKind.Auto)]
public readonly struct BinaryLogEntry<T>() : IRaftLogEntry, ISupplier<MemoryAllocator<byte>, MemoryOwner<byte>>
public readonly struct BinaryLogEntry<T>() : IInputLogEntry, ISupplier<MemoryAllocator<byte>, MemoryOwner<byte>>
where T : notnull, IBinaryFormattable<T>
{
/// <summary>
Expand Down Expand Up @@ -44,6 +44,13 @@ public readonly struct BinaryLogEntry<T>() : IRaftLogEntry, ISupplier<MemoryAllo
/// <inheritdoc />
long? IDataTransferObject.Length => T.Size;

/// <inheritdoc cref="IInputLogEntry.Context"/>
public object? Context
{
get;
init;
}

/// <inheritdoc />
MemoryOwner<byte> ISupplier<MemoryAllocator<byte>, MemoryOwner<byte>>.Invoke(MemoryAllocator<byte> allocator)
=> IBinaryFormattable<T>.Format(Content, allocator);
Expand All @@ -57,7 +64,7 @@ ValueTask IDataTransferObject.WriteToAsync<TWriter>(TWriter writer, Cancellation
/// Represents default implementation of <see cref="IRaftLogEntry"/>.
/// </summary>
[StructLayout(LayoutKind.Auto)]
public readonly struct BinaryLogEntry() : IRaftLogEntry, ISupplier<MemoryAllocator<byte>, MemoryOwner<byte>>
public readonly struct BinaryLogEntry() : IInputLogEntry, ISupplier<MemoryAllocator<byte>, MemoryOwner<byte>>
{
private readonly ReadOnlyMemory<byte> content;

Expand Down Expand Up @@ -98,6 +105,13 @@ public int? CommandId
/// <inheritdoc />
bool IDataTransferObject.IsReusable => true;

/// <inheritdoc cref="IInputLogEntry.Context"/>
public object? Context
{
get;
init;
}

/// <inheritdoc />
bool IDataTransferObject.TryGetMemory(out ReadOnlyMemory<byte> memory)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,21 @@ protected DiskBasedStateMachine(string path, int recordsPerPartition, Options? c
Volatile.Write(ref lastTerm, entry.Term);

// Remove log entry from the cache according to eviction policy
var lastEntryInPartition = startIndex == commitIndex || startIndex == partition.LastIndex;
if (!entry.IsPersisted)
{
await partition.PersistCachedEntryAsync(startIndex, snapshotLength.HasValue).ConfigureAwait(false);

// Flush partition if we are finished or at the last entry in it
if (startIndex == commitIndex || startIndex == partition.LastIndex)
if (lastEntryInPartition)
{
await partition.FlushAsync(token).ConfigureAwait(false);
partition.ClearContext(startIndex);
}
}
else if (lastEntryInPartition)
{
partition.ClearContext(startIndex);
}

if (snapshotLength.HasValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

namespace DotNext.Net.Cluster.Consensus.Raft;

using DotNext.Buffers;
using IO;
using IO.Log;

/// <summary>
/// Represents No-OP entry.
/// </summary>
[StructLayout(LayoutKind.Auto)]
public readonly struct EmptyLogEntry() : IRaftLogEntry
public readonly struct EmptyLogEntry() : IRaftLogEntry, ISupplier<MemoryAllocator<byte>, MemoryOwner<byte>>
{
/// <inheritdoc/>
int? IRaftLogEntry.CommandId => null;
Expand Down Expand Up @@ -47,4 +48,8 @@ ValueTask IDataTransferObject.WriteToAsync<TWriter>(TWriter writer, Cancellation
/// <inheritdoc/>
ValueTask<TResult> IDataTransferObject.TransformAsync<TResult, TTransformation>(TTransformation transformation, CancellationToken token)
=> IDataTransferObject.Empty.TransformAsync<TResult, TTransformation>(transformation, token);

/// <inheritdoc/>
MemoryOwner<byte> ISupplier<MemoryAllocator<byte>, MemoryOwner<byte>>.Invoke(MemoryAllocator<byte> allocator)
=> default;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace DotNext.Net.Cluster.Consensus.Raft;

/// <summary>
/// Represents a custom log entry that can be passed to the log.
/// </summary>
public interface IInputLogEntry : IRaftLogEntry
{
/// <summary>
/// Gets or sets runtime context associated with the log entry.
/// </summary>
object? Context { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace DotNext.Net.Cluster.Consensus.Raft;
/// </summary>
/// <typeparam name="T">JSON-serializable type.</typeparam>
[StructLayout(LayoutKind.Auto)]
public readonly struct JsonLogEntry<T>() : IRaftLogEntry
public readonly struct JsonLogEntry<T>() : IInputLogEntry
where T : notnull, IJsonSerializable<T>
{
/// <summary>
Expand All @@ -37,6 +37,13 @@ public readonly struct JsonLogEntry<T>() : IRaftLogEntry
/// <inheritdoc />
int? IRaftLogEntry.CommandId => null;

/// <inheritdoc cref="IInputLogEntry.Context"/>
public object? Context
{
get;
init;
}

/// <inheritdoc />
ValueTask IDataTransferObject.WriteToAsync<TWriter>(TWriter writer, CancellationToken token)
=> JsonSerializable<T>.SerializeAsync(writer, Content, token);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,8 @@ long ComputeUpperBoundIndex(long count)
/// <seealso cref="Commands.CommandInterpreter"/>
protected abstract ValueTask ApplyAsync(LogEntry entry);

private ValueTask ApplyCoreAsync(LogEntry entry) => entry.IsEmpty ? new() : ApplyAsync(entry); // skip empty log entry
private ValueTask ApplyCoreAsync(LogEntry entry)
=> entry.IsEmpty ? new() : ApplyAsync(entry); // skip empty log entry

[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
private async ValueTask ApplyAsync(int sessionId, long startIndex, CancellationToken token)
Expand All @@ -686,13 +687,21 @@ private async ValueTask ApplyAsync(int sessionId, long startIndex, CancellationT
Volatile.Write(ref lastTerm, entry.Term);

// Remove log entry from the cache according to eviction policy
var lastEntryInPartition = startIndex == commitIndex || startIndex == partition.LastIndex;
if (!entry.IsPersisted)
{
await partition.PersistCachedEntryAsync(startIndex, evictOnCommit).ConfigureAwait(false);

// Flush partition if we are finished or at the last entry in it
if (startIndex == commitIndex || startIndex == partition.LastIndex)
if (lastEntryInPartition)
{
await partition.FlushAsync(token).ConfigureAwait(false);
partition.ClearContext(startIndex);
}
}
else if (lastEntryInPartition)
{
partition.ClearContext(startIndex);
}
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ namespace DotNext.Net.Cluster.Consensus.Raft;
public partial class PersistentState
{
[StructLayout(LayoutKind.Auto)]
internal struct CacheRecord() : IDisposable
internal struct CacheRecord : IDisposable
{
internal MemoryOwner<byte> Content;
internal CachedLogEntryPersistenceMode PersistenceMode = CachedLogEntryPersistenceMode.CopyToBuffer;
internal CachedLogEntryPersistenceMode PersistenceMode;
internal object? Context;

public void Dispose() => Content.Dispose();
public void Dispose()
{
Context = null;
Content.Dispose();
}
}

internal enum CachedLogEntryPersistenceMode : byte
Expand All @@ -28,10 +33,16 @@ internal enum CachedLogEntryPersistenceMode : byte
/// Represents buffered Raft log entry.
/// </summary>
[StructLayout(LayoutKind.Auto)]
internal readonly struct CachedLogEntry : IRaftLogEntry
internal readonly struct CachedLogEntry : IInputLogEntry
{
private readonly CacheRecord record;

public object? Context
{
get => record.Context;
init => record.Context = value;
}

internal CachedLogEntryPersistenceMode PersistenceMode
{
get => record.PersistenceMode;
Expand All @@ -46,7 +57,7 @@ required internal MemoryOwner<byte> Content

required public long Term { get; init; }

public int? CommandId { get; init; }
required public int? CommandId { get; init; }

internal long Length => record.Content.Length;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public partial class PersistentState
/// Use <see cref="TransformAsync"/> to decode the log entry.
/// </remarks>
[StructLayout(LayoutKind.Auto)]
protected internal readonly struct LogEntry : IRaftLogEntry
protected internal readonly struct LogEntry : IInputLogEntry
{
// null (if empty), FileReader, IAsyncBinaryReader, or byte[], or MemoryManager<byte>
private readonly object? content;
Expand Down Expand Up @@ -50,6 +50,15 @@ internal bool IsPersisted
init;
}

/// <summary>
/// Gets or sets context associated with this log entry.
/// </summary>
public object? Context
{
get;
init;
}

internal IAsyncBinaryReader? ContentReader
{
init => content = metadata.Length > 0L ? value : IAsyncBinaryReader.Empty;
Expand Down
Loading

0 comments on commit 1c74d61

Please sign in to comment.