Skip to content

feat(zerozone): optimize wal replaying via network transfer#3100

Merged
superhx merged 1 commit into
1.6from
light_replay
Dec 12, 2025
Merged

feat(zerozone): optimize wal replaying via network transfer#3100
superhx merged 1 commit into
1.6from
light_replay

Conversation

@superhx
Copy link
Copy Markdown
Collaborator

@superhx superhx commented Dec 11, 2025

No description provided.

Copilot AI review requested due to automatic review settings December 11, 2025 11:18
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR optimizes WAL (Write-Ahead Log) replaying via network transfer in the zerozone feature by implementing a delta-based synchronization mechanism. Instead of forcing clients to replay the entire WAL from storage, the system now transfers incremental WAL data over the network when feasible.

Key changes:

  • Extended AppendResult to include a nextOffset field for tracking valid next record positions
  • Implemented ConfirmWalDataDelta to maintain a bounded in-memory buffer of recent WAL appends that can be piggy-backed in snapshot responses
  • Updated API version to support delta data transfer in AutomqGetPartitionSnapshotResponse version 2

Reviewed changes

Copilot reviewed 21 out of 21 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
AutomqGetPartitionSnapshotRequest.json Bumped API version from 0-1 to 0-2 to support new delta transfer feature
AutomqGetPartitionSnapshotResponse.json Added ConfirmWalDeltaData field in version 2+ for transferring incremental WAL data
AppendResult.java Added nextOffset() method to track the valid offset for the next record
DefaultAppendResult.java Converted from class to record with both recordOffset and nextOffset fields
RecordOffset.java Made interface extend Comparable<RecordOffset> for offset comparison
DefaultRecordOffset.java Implemented compareTo method for offset comparison
DefaultWriter.java Updated to calculate nextOffset with alignment for the last record in each bulk to handle S3 range request limitations
MemoryWriteAheadLog.java Updated to create DefaultAppendResult with proper record and next offsets
ConfirmWAL.java Added listener mechanism to notify when records are appended to the WAL
S3Storage.java Integrated ConfirmWAL listener to propagate append events; refactored link record creation
ConfirmWalDataDelta.java New class implementing delta buffer management with state machine for sync coordination
PartitionSnapshotsManager.java Integrated delta handling into session management with proper cleanup
SnapshotReadCache.java Updated to accept pre-fetched WAL records to avoid redundant WAL reads; changed metrics levels
SubscriberRequester.java Updated to extract and forward delta data from responses
SubscriberReplayer.java Modified to use delta data when available, with error handling
SnapshotReadPartitionsManager.java Updated replay signature to support delta data
Replayer.java Extended interface to accept optional pre-decoded WAL records
DefaultReplayer.java Forwarded delta records to underlying snapshot cache
ZeroZoneMetricsManager.java Changed histogram metrics level from DEBUG to INFO
ConfirmWalDataDeltaTest.java Added comprehensive tests for delta buffer state machine and overflow handling
SnapshotReadPartitionsManagerTest.java Updated mock to match new replay signature
Comments suppressed due to low confidence (1)

s3stream/src/main/java/com/automq/stream/s3/cache/SnapshotReadCache.java:296

  • When the loadCf completes exceptionally (due to errors in line 283 or 294), the task remains in loadingTasks and its walRecords (if provided externally) are never released, causing a memory leak. Tasks that fail should be removed from loadingTasks and their walRecords should be released.
        public void run() {
            long startNanos = time.nanoseconds();
            CompletableFuture<List<StreamRecordBatch>> walRecordsCf = walRecords != null ?
                CompletableFuture.completedFuture(walRecords) : wal.get(startOffset, endOffset);
            walRecordsCf.thenCompose(walRecords -> {
                long readWalDoneNanos = time.nanoseconds();
                READ_WAL_LATENCY.record(readWalDoneNanos - startNanos);
                List<CompletableFuture<StreamRecordBatch>> cfList = new ArrayList<>(walRecords.size());
                for (StreamRecordBatch walRecord : walRecords) {
                    if (walRecord.getCount() >= 0) {
                        cfList.add(CompletableFuture.completedFuture(walRecord));
                    } else {
                        cfList.add(linkRecordDecoder.decode(walRecord));
                    }
                }
                return CompletableFuture.allOf(cfList.toArray(new CompletableFuture[0])).whenComplete((rst, ex) -> {
                    DECODE_LATENCY.record(time.nanoseconds() - readWalDoneNanos);
                    if (ex != null) {
                        loadCf.completeExceptionally(ex);
                        // release other success record
                        cfList.forEach(cf -> cf.thenAccept(StreamRecordBatch::release));
                        return;
                    }
                    records.addAll(cfList.stream().map(CompletableFuture::join).toList());
                    records.forEach(r -> r.encoded(ENCODE_ALLOC));
                    loadCf.complete(null);
                });
            }).whenComplete((rst, ex) -> {
                if (ex != null) {
                    loadCf.completeExceptionally(ex);
                }
            });

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread core/src/main/java/kafka/automq/partition/snapshot/ConfirmWalDataDelta.java Outdated
Comment thread core/src/main/java/kafka/automq/partition/snapshot/ConfirmWalDataDelta.java Outdated
Comment thread core/src/main/java/kafka/automq/partition/snapshot/ConfirmWalDataDelta.java Outdated
Comment thread s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/DefaultWriter.java Outdated
Comment thread core/src/main/java/kafka/automq/zerozone/SubscriberReplayer.java
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
@superhx superhx merged commit bfd8848 into 1.6 Dec 12, 2025
6 checks passed
@superhx superhx deleted the light_replay branch December 12, 2025 06:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants