Conversation
|
Important Review skippedMore than 25% of the files skipped due to max files limit. The review is being skipped to prevent a low-quality review. 95 files out of 202 files are above the max files limit of 100. Please upgrade to Pro plan to get higher limits. You can disable this status message by setting the Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
ecadc16 to
75670a9
Compare
There was a problem hiding this comment.
Pull Request Overview
This PR introduces significant asynchronous I/O improvements and adds chunk deletion capabilities to the EventStore codebase. The changes modernize the I/O operations by transitioning from synchronous to asynchronous patterns throughout the transaction log subsystem.
Key changes:
- Converted synchronous I/O operations to async/await patterns across chunk reading, writing, and scavenging
- Introduced
IChunkDeleterinterface andChunkDeleterimplementation for chunk retention/deletion logic - Added
IChunkHandleabstraction for unified chunk file access with async operations - Replaced
BinaryReader/Writerwith modernBufferWriterSlimandSequenceReaderfrom DotNext library - Introduced
IncrementalHashfor streaming hash computation replacingHashAlgorithm
Reviewed Changes
Copilot reviewed 197 out of 197 changed files in this pull request and generated 74 comments.
Show a summary per file
| File | Description |
|---|---|
| src/NuGet.Config | Adds NuGet package source configuration for GitHub packages |
| src/EventStore.TestClient/EventStore.TestClient.csproj | Replaces EventStore.Plugins with TrogonEventStore.Plugins package |
| src/EventStore.Transport.Tcp/Framing/IMessageFramer.cs | Adds async message framing interfaces |
| src/EventStore.Core/Util/MD5Hash.cs | Adds async hash computation with IncrementalHash |
| src/EventStore.Core/Transforms/Identity/IdentityChunkWriteTransform.cs | Converts to async write operations |
| src/EventStore.Core/TransactionLog/Scavenging/Stages/ChunkExecutor.cs | Adds chunk deletion support and thread count validation |
| src/EventStore.Core/TransactionLog/Scavenging/Stages/ChunkDeleter.cs | New chunk deletion implementation with retention logic |
| src/EventStore.Core/TransactionLog/LogRecords/*.cs | Converts log record I/O to async with SequenceReader |
| src/EventStore.Core/TransactionLog/Chunks/TFChunk*.cs | Converts chunk I/O operations to async patterns |
| src/EventStore.Core/Services/Replication/LogRecordFramer.cs | Converts to async message framing |
Comments suppressed due to low confidence (4)
src/EventStore.Core/Services/ClusterStorageWriterService.cs:430
- This foreach loop implicitly filters its target sequence - consider filtering the sequence explicitly using '.Where(...)'.
src/EventStore.Core/TransactionLog/Chunks/TFChunkScavenger.cs:454 - These 'if' statements can be combined.
src/EventStore.Core/TransactionLog/Chunks/TFChunkScavenger.cs:144 - Generic catch clause.
src/EventStore.Core/TransactionLog/Chunks/TFChunkScavenger.cs:390 - Generic catch clause.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| { | ||
| Debug.Assert(stringSize >= 0); | ||
|
|
||
| return stringSize is 0 ? 1 : stringSize + int.Log2(stringSize) / 7 + 1; |
There was a problem hiding this comment.
The calculation int.Log2(stringSize) / 7 + 1 uses integer division which may undercount bytes needed for ULEB128 encoding. For example, with stringSize=128, Log2(128)=7, and 7/7+1=2, but ULEB128 needs 2 bytes for values 128-16383. The calculation should be (int.Log2(stringSize) + 6) / 7 + 1 to ensure correct ceiling division.
| token.ThrowIfCancellationRequested(); | ||
| token = CancellationToken.None; |
There was a problem hiding this comment.
The cancellation token is explicitly set to CancellationToken.None after checking for cancellation, which defeats the purpose of cancellation propagation. This means the async operations below (like CompleteChunkInTransaction, AddNewChunk, Flush) cannot be cancelled once this method starts. This could lead to unresponsive behavior when cancellation is requested. Consider removing line 72 to allow proper cancellation throughout the method.
| token.ThrowIfCancellationRequested(); | ||
| token = CancellationToken.None; |
There was a problem hiding this comment.
Similar to the issue in Write(), the cancellation token is set to CancellationToken.None after checking for cancellation. This prevents CompleteChunkInTransaction from being properly cancelled. Remove line 135 to maintain cancellation capability.
| token.ThrowIfCancellationRequested(); | ||
| token = CancellationToken.None; |
There was a problem hiding this comment.
The cancellation token is set to CancellationToken.None after checking for cancellation, preventing proper cancellation of CompleteReplicatedRawChunkInTransaction. Remove line 156 to maintain cancellation capability.
| using DotNext; | ||
| using DotNext.IO; | ||
| using EventStore.Plugins.Transforms; | ||
| using Microsoft.IO; |
There was a problem hiding this comment.
The Microsoft.IO namespace is imported but not used in this file. Remove this unused import.
| catch (Exception ex) | ||
| { | ||
| lastError = ex; | ||
| _logger.Warning(ex, | ||
| "Unable to determine existence of logical chunk {LogicalChunkNumber} in the archive. Attempt {Attempt}/{MaxAttempts}", | ||
| logicalChunkNumber, attempt + 1, _maxAttempts); | ||
| } |
There was a problem hiding this comment.
Generic catch clause.
| if (context.Request.Query.TryGetValue("liveCode", out var expected) && | ||
| int.TryParse(expected, out var statusCode)) | ||
| { | ||
| context.Response.StatusCode = statusCode; | ||
| } | ||
| return Task.CompletedTask; | ||
| }; | ||
|
|
||
| private static MidFunc GetAndHeadOnly => (context, next) => { | ||
| switch (context.Request.Method) { | ||
| case "HEAD": | ||
| context.Request.Method = "GET"; | ||
| return next(); | ||
| case "GET": | ||
| return next(); | ||
| default: | ||
| context.Response.StatusCode = 405; | ||
| return Task.CompletedTask; | ||
| else | ||
| { | ||
| context.Response.StatusCode = _livecode; | ||
| } |
There was a problem hiding this comment.
Both branches of this 'if' statement write to the same variable - consider using '?' to express intent better.
| if (Version >= (byte)ChunkVersions.Transformed) | ||
| TransformType = (TransformType)reader.Read(); | ||
| else | ||
| TransformType = TransformType.Identity; |
There was a problem hiding this comment.
Both branches of this 'if' statement write to the same variable - consider using '?' to express intent better.
| if (record.IsTombstone || record.IsTransactionBegin) | ||
| { | ||
| return false; | ||
| } | ||
| else | ||
| { | ||
| return true; | ||
| } else { | ||
| // remove all the prepares except | ||
| // - the tombstone itself and | ||
| // - any TransactionBegins (because old scavenge keeps these if there is any | ||
| // doubt about whether it has been committed) | ||
| if (record.IsTombstone || record.IsTransactionBegin) { | ||
| return false; | ||
| } else { | ||
| return true; | ||
| } | ||
| } |
There was a problem hiding this comment.
Both branches of this 'if' statement return - consider using '?' to express intent better.
| if (state.TryGetChunkExecutionInfo(streamId, out var details)) | ||
| { | ||
| return details; | ||
| } | ||
| else | ||
| { | ||
| return new ChunkExecutionInfo( | ||
| isTombstoned: metastreamData.IsTombstoned, | ||
| discardPoint: metastreamData.DiscardPoint, | ||
| isTombstoned: false, | ||
| discardPoint: DiscardPoint.KeepAll, | ||
| maybeDiscardPoint: DiscardPoint.KeepAll, | ||
| maxAge: null); | ||
| } else { | ||
| // original stream | ||
| if (state.TryGetChunkExecutionInfo(streamId, out var details)) { | ||
| return details; | ||
| } else { | ||
| return new ChunkExecutionInfo( | ||
| isTombstoned: false, | ||
| discardPoint: DiscardPoint.KeepAll, | ||
| maybeDiscardPoint: DiscardPoint.KeepAll, | ||
| maxAge: null); | ||
| } | ||
| } |
There was a problem hiding this comment.
Both branches of this 'if' statement return - consider using '?' to express intent better.
* Ignore cancellation within transactions * Move internal API of read side to async * Migrate TryReadForwardRawInternal to async * Prevent opening db that has been closed Signed-off-by: JOSE FUXA <9057699+j-fuxa@users.noreply.github.com> Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Core async infrastructure complete including TFChunk read/write operations, log record serialization, transform infrastructure, HashListMemTable synchronization, and EpochManager.
Note
Major async refactor of core IO and framing, introduce archive (FS/S3) with catch‑up and retention/deletion, overhaul scavenging (thresholds, multi‑thread), and update CI/NuGet/config to support new infrastructure.
NuGet.Config; swap plugins package toTrogonEventStore.Plugins; minor metrics and helper tweaks.Written by Cursor Bugbot for commit 7576188. This will update automatically on new commits. Configure here.