fix: dedupe S3 metadata after snapshot replay#3370
Conversation
There was a problem hiding this comment.
Pull request overview
Fixes duplicate S3 metadata after snapshot replay on a non-empty image by (1) completing finishSnapshot() for AutoMQ S3/KV deltas so stale entries are removed, (2) making stream-object and stream-set-object replay idempotent by rebuilding DeltaLists keyed by objectId, and (3) adding defensive deduplication in compaction inputs. Cache invalidation in S3StreamSetObject is also wired in for the same-objectId replay case.
Changes:
- Add
finishSnapshot()toS3StreamsMetadataDelta/S3StreamMetadataDelta/NodeS3WALMetadataDelta/S3ObjectsDelta/KVDeltaand invoke them fromMetadataDelta.finishSnapshot(). - Rebuild stream-object and stream-set-object
DeltaListinstances fromLinkedHashMaps keyed byobjectId; invalidateS3StreamSetObjectstatic caches on construction andsortAndEncode. - Defensively deduplicate
S3ObjectMetadatainputs inCompactionManagerandStreamObjectCompactor(group0andcompact0) with warning logs; add tests.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java | Calls the new finishSnapshot() on S3 streams, S3 objects, and KV deltas. |
| metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java | Adds finishSnapshot() that marks unseen streams/nodes deleted and unseen stream end offsets as null; apply() treats null end offsets as removal. |
| metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java | Adds finishSnapshot() for ranges/stream objects; rebuilds stream-object DeltaList keyed by objectId. |
| metadata/src/main/java/org/apache/kafka/image/NodeS3WALMetadataDelta.java | Adds finishSnapshot(); rebuilds stream-set-object DeltaList keyed by objectId. |
| metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java | Adds finishSnapshot() removing objects missing from snapshot replay. |
| metadata/src/main/java/org/apache/kafka/image/KVDelta.java | Adds finishSnapshot() removing keys missing from snapshot replay. |
| metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java | Invalidates static range caches on construction and in sortAndEncode. |
| s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java | Defensive dedup of input metadata at multiple stages; exposes streamDataBlockMap() for tests. |
| s3stream/src/main/java/com/automq/stream/s3/compact/StreamObjectCompactor.java | Defensive dedup in compact0 and group0. |
| metadata/src/test/java/org/apache/kafka/image/{NodeRuntimeMetadataImageTest,StreamRuntimeMetadataImageTest,S3ObjectsImageTest,KVImageTest}.java | Regression tests for same-objectId replay and snapshot finalization. |
| s3stream/src/test/java/com/automq/stream/s3/compact/{CompactionManagerTest,StreamObjectCompactorTest}.java | Regression tests for compaction input deduplication. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
d5d6099 to
6b753cb
Compare
| if (addedS3StreamSetObjects.isEmpty() && removedS3StreamSetObjects.isEmpty()) { | ||
| streamSetObjects = image.getObjects(); | ||
| } else if (snapshotReplay) { | ||
| Map<Long, S3StreamSetObject> objects = new LinkedHashMap<>(); |
There was a problem hiding this comment.
Changed the snapshot materialization map to HashMap. The final SSO read paths do not depend on insertion order: orderList() sorts by orderId, and direct object assertions sort by objectId in tests where needed.
| if (changedS3StreamObjects.isEmpty() && removedS3StreamObjectIds.isEmpty()) { | ||
| newS3StreamObjects = image.streamObjects; | ||
| } else if (snapshotReplay) { | ||
| Map<Long, S3StreamObject> objects = new LinkedHashMap<>(); |
There was a problem hiding this comment.
Changed this snapshot materialization map to HashMap as well. S3StreamMetadataImage#getStreamObjects() sorts stream objects by startOffset, so map iteration order is not part of the semantics.
| assertEquals(List.of(new S3StreamSetObject(0L, BROKER0, List.of( | ||
| new StreamOffsetRange(STREAM0, 0L, 100L)), 0L)), objects); | ||
| } | ||
|
|
There was a problem hiding this comment.
- Missing new scenarios
- Missing verification for Delta
There was a problem hiding this comment.
Expanded the snapshot tests to cover update + add + remove in one scenario, and added package-level delta-state assertions for the changed/removed sets. I also added similar delta-state checks for SO, S3 objects, and KV snapshot finalization so the same concern is covered across the touched deltas.
237aba4 to
d2dbdd7
Compare
d2dbdd7 to
d7a40cf
Compare
## Summary - Complete snapshot finalization for AutoMQ S3/KV metadata deltas so snapshot replay removes stale metadata from non-empty images. - In snapshot-finalization mode only, rebuild S3 stream-set-object and stream-object `DeltaList` instances from the final objectId-keyed state. Normal incremental replay keeps the existing `DeltaList.copy()` append/remove path. - Add defensive compaction input deduplication by `objectId` with warning logs. - Add regression coverage for same-`objectId` snapshot replay, snapshot finalization, delta changed/removed sets, and compaction duplicate inputs. ## Tests - `./gradlew :metadata:test --tests 'org.apache.kafka.image.NodeRuntimeMetadataImageTest' --tests 'org.apache.kafka.image.StreamRuntimeMetadataImageTest' --tests 'org.apache.kafka.image.S3ObjectsImageTest' --tests 'org.apache.kafka.image.KVImageTest' --tests 'org.apache.kafka.image.S3StreamsMetadataImageTest'` - `./gradlew :s3stream:test --tests 'com.automq.stream.s3.compact.CompactionManagerTest' --tests 'com.automq.stream.s3.compact.StreamObjectCompactorTest'` - `git diff --check HEAD~1..HEAD` ## Related - Cherry-pick of #3370 to 1.6 - 1.6 keeps the pre-`KVKey` String-keyed KV metadata API, so the KV snapshot finalization backport uses the 1.6 `KVImage.kvs()` path.
## Summary - Complete snapshot finalization for AutoMQ S3/KV metadata deltas so snapshot replay removes stale metadata from non-empty images. - In snapshot-finalization mode only, rebuild S3 stream-set-object and stream-object `DeltaList` instances from the final objectId-keyed state. Normal incremental replay keeps the existing `DeltaList.copy()` append/remove path. - Add defensive compaction input deduplication by `objectId` with warning logs. - Add regression coverage for same-`objectId` snapshot replay, snapshot finalization, delta changed/removed sets, and compaction duplicate inputs. ## Tests - `./gradlew :metadata:test --tests 'org.apache.kafka.image.NodeRuntimeMetadataImageTest' --tests 'org.apache.kafka.image.StreamRuntimeMetadataImageTest' --tests 'org.apache.kafka.image.S3ObjectsImageTest' --tests 'org.apache.kafka.image.KVImageTest' --tests 'org.apache.kafka.image.S3StreamsMetadataImageTest'` - `./gradlew :s3stream:test --tests 'com.automq.stream.s3.compact.CompactionManagerTest' --tests 'com.automq.stream.s3.compact.StreamObjectCompactorTest'` - `git diff --check HEAD~1..HEAD` ## Related - Cherry-pick of #3370 to 1.7
Summary
DeltaListinstances from the final objectId-keyed state. Normal incremental replay keeps the existingDeltaList.copy()append/remove path.objectIdwith warning logs.objectIdsnapshot replay, snapshot finalization, delta changed/removed sets, and compaction duplicate inputs.Closes #3369
Tests
./gradlew :metadata:test --tests 'org.apache.kafka.image.NodeRuntimeMetadataImageTest' --tests 'org.apache.kafka.image.StreamRuntimeMetadataImageTest' --tests 'org.apache.kafka.image.S3ObjectsImageTest' --tests 'org.apache.kafka.image.KVImageTest' --tests 'org.apache.kafka.image.S3StreamsMetadataImageTest'./gradlew :s3stream:test --tests 'com.automq.stream.s3.compact.CompactionManagerTest' --tests 'com.automq.stream.s3.compact.StreamObjectCompactorTest'git diff --check