Skip to content

Commit

Permalink
Attempt to fix #242
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Jun 9, 2024
1 parent 8f7d060 commit 6be595d
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,34 @@ public static async Task Overwrite(long? maxLogEntrySize)
}
}

[Fact]
public static async Task OverwriteUnsealed()
{
var entry1 = new TestLogEntry("SET A = 0 SET B=1 SET C=2 SET D=3 SET E=4 SET F=5") { Term = 42L };
var entry2 = new TestLogEntry("SET Y = 1") { Term = 43L };
Func<IReadOnlyList<IRaftLogEntry>, long?, CancellationToken, ValueTask<Missing>> checker;
var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
using (var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { UseCaching = false }))
{
Equal(1L, await state.AppendAsync(entry1));
await state.AppendAsync(entry2, 1L);
}

//read again
using (var state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { UseCaching = false }))
{
checker = async (entries, snapshotIndex, token) =>
{
Null(snapshotIndex);
Single(entries);
False(entries[0].IsSnapshot);
Equal(entry2.Content, await entries[0].ToStringAsync(Encoding.UTF8));
return Missing.Value;
};
await state.As<IRaftLog>().ReadAsync(new LogEntryConsumer(checker), 1L, CancellationToken.None);
}
}

[Obsolete]
[Fact]
public static async Task LegacyOverwrite()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,11 +505,12 @@ internal override void Initialize()
if (RandomAccess.Read(Handle, header.Span, fileOffset: 0L) < HeaderSize)
{
header.Span.Clear();
writer.FilePosition = HeaderSize;
}
else if (IsSealed)
{
// partition is completed, read table
fileOffset = RandomAccess.GetLength(Handle);
writer.FilePosition = fileOffset = RandomAccess.GetLength(Handle);

if (fileOffset < footer.Length + HeaderSize)
throw new IntegrityException(ExceptionMessages.InvalidPartitionFormat);
Expand All @@ -533,7 +534,7 @@ internal override void Initialize()
fileOffset = HeaderSize;
}

for (Span<byte> metadataBuffer = stackalloc byte[LogEntryMetadata.Size], metadataTable = footer.Span; ; footerOffset += LogEntryMetadata.Size)
for (Span<byte> metadataBuffer = stackalloc byte[LogEntryMetadata.Size], metadataTable = footer.Span; footerOffset < footer.Length; footerOffset += LogEntryMetadata.Size)
{
var count = RandomAccess.Read(Handle, metadataBuffer, fileOffset);
if (count < LogEntryMetadata.Size)
Expand All @@ -543,6 +544,7 @@ internal override void Initialize()
if (fileOffset <= 0L)
break;

writer.FilePosition = fileOffset;
metadataBuffer.CopyTo(metadataTable.Slice(footerOffset, LogEntryMetadata.Size));
}
}
Expand All @@ -562,6 +564,7 @@ private long GetWriteAddress(int index)
protected override async ValueTask PersistAsync<TEntry>(TEntry entry, int index, CancellationToken token)
{
var writeAddress = GetWriteAddress(index);
await UnsealIfNeededAsync(writeAddress, token).ConfigureAwait(false);

LogEntryMetadata metadata;
Memory<byte> metadataBuffer;
Expand Down Expand Up @@ -601,6 +604,8 @@ protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int i
Debug.Assert(writer.HasBufferedData is false);

var writeAddress = GetWriteAddress(index);
await UnsealIfNeededAsync(writeAddress, token).ConfigureAwait(false);

var startPos = writeAddress + LogEntryMetadata.Size;
var metadata = LogEntryMetadata.Create(entry, startPos, entry.Length);
var metadataBuffer = GetMetadataBuffer(index);
Expand All @@ -621,12 +626,59 @@ protected override void OnCached(in CachedLogEntry cachedEntry, int index)
metadata.Format(GetMetadataBuffer(index).Span);
}

private ValueTask UnsealIfNeededAsync(long truncatePosition, CancellationToken token)
{
ValueTask task;
if (IsSealed)
{
task = UnsealAsync(truncatePosition, token);
}
else if (token.IsCancellationRequested)
{
task = ValueTask.FromCanceled(token);
}
else if (truncatePosition < writer.FilePosition)
{
task = new();
try
{
// The caller is trying to rewrite the log entry.
// For a correctness of Initialize() method for unsealed partitions, we
// need to adjust file size. This is expensive syscall which can lead to file fragmentation.
// However, this is acceptable because rare.
RandomAccess.SetLength(Handle, truncatePosition);
}
catch (Exception e)
{
task = ValueTask.FromException(e);
}
}
else
{
task = new();
}

return task;
}

private async ValueTask UnsealAsync(long truncatePosition, CancellationToken token)
{
// This is expensive operation in terms of I/O. However, it is needed only when
// the consumer decided to rewrite the existing log entry, which is rare.
IsSealed = false;
await WriteHeaderAsync(token).ConfigureAwait(false);
RandomAccess.FlushToDisk(Handle);

// destroy all entries in the tail of partition
RandomAccess.SetLength(Handle, truncatePosition);
}

public override ValueTask FlushAsync(CancellationToken token = default)
{
return runningIndex == LastIndex
return IsSealed
? ValueTask.CompletedTask
: runningIndex == LastIndex
? FlushAndSealAsync(token)
: IsSealed
? FlushAndUnsealAsync(token)
: base.FlushAsync(token);
}

Expand All @@ -651,41 +703,7 @@ private async ValueTask FlushAndSealAsync(CancellationToken token)
IsSealed = true;
await WriteHeaderAsync(token).ConfigureAwait(false);

writer.FlushToDisk();
}

private async ValueTask FlushAndUnsealAsync(CancellationToken token)
{
await FlushAndEraseNextEntryAsync(token).ConfigureAwait(false);

IsSealed = false;
await WriteHeaderAsync(token).ConfigureAwait(false);

writer.FlushToDisk();
}

private ValueTask FlushAndEraseNextEntryAsync(CancellationToken token)
{
ValueTask task;

// write the rest of the entry,
// then cleanup next entry header to indicate that the current entry is the last entry
if (!writer.HasBufferedData)
{
task = RandomAccess.WriteAsync(Handle, EmptyMetadata, writer.FilePosition, token);
}
else if (writer.Buffer is { Length: >= LogEntryMetadata.Size } emptyMetadataStub)
{
emptyMetadataStub.Span.Slice(0, LogEntryMetadata.Size).Clear();
writer.Produce(LogEntryMetadata.Size);
task = writer.WriteAsync(token);
}
else
{
task = writer.WriteAsync(EmptyMetadata, token);
}

return task;
RandomAccess.FlushToDisk(Handle);
}

private ValueTask WriteHeaderAsync(CancellationToken token)
Expand Down

0 comments on commit 6be595d

Please sign in to comment.