Skip to content

fix: dedupe S3 metadata after snapshot replay#3370

Merged
Gezi-lzq merged 1 commit into
mainfrom
codex/issue-3369-s3-metadata-snapshot-dedupe
May 29, 2026
Merged

fix: dedupe S3 metadata after snapshot replay#3370
Gezi-lzq merged 1 commit into
mainfrom
codex/issue-3369-s3-metadata-snapshot-dedupe

Conversation

@Gezi-lzq
Copy link
Copy Markdown
Contributor

@Gezi-lzq Gezi-lzq commented May 27, 2026

Summary

  • Complete snapshot finalization for AutoMQ S3/KV metadata deltas so snapshot replay removes stale metadata from non-empty images.
  • In snapshot-finalization mode only, rebuild S3 stream-set-object and stream-object DeltaList instances from the final objectId-keyed state. Normal incremental replay keeps the existing DeltaList.copy() append/remove path.
  • Add defensive compaction input deduplication by objectId with warning logs.
  • Add regression coverage for same-objectId snapshot replay, snapshot finalization, delta changed/removed sets, and compaction duplicate inputs.

Closes #3369

Tests

  • ./gradlew :metadata:test --tests 'org.apache.kafka.image.NodeRuntimeMetadataImageTest' --tests 'org.apache.kafka.image.StreamRuntimeMetadataImageTest' --tests 'org.apache.kafka.image.S3ObjectsImageTest' --tests 'org.apache.kafka.image.KVImageTest' --tests 'org.apache.kafka.image.S3StreamsMetadataImageTest'
  • ./gradlew :s3stream:test --tests 'com.automq.stream.s3.compact.CompactionManagerTest' --tests 'com.automq.stream.s3.compact.StreamObjectCompactorTest'
  • git diff --check

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes duplicate S3 metadata after snapshot replay on a non-empty image by (1) completing finishSnapshot() for AutoMQ S3/KV deltas so stale entries are removed, (2) making stream-object and stream-set-object replay idempotent by rebuilding DeltaLists keyed by objectId, and (3) adding defensive deduplication in compaction inputs. Cache invalidation in S3StreamSetObject is also wired in for the same-objectId replay case.

Changes:

  • Add finishSnapshot() to S3StreamsMetadataDelta/S3StreamMetadataDelta/NodeS3WALMetadataDelta/S3ObjectsDelta/KVDelta and invoke them from MetadataDelta.finishSnapshot().
  • Rebuild stream-object and stream-set-object DeltaList instances from LinkedHashMaps keyed by objectId; invalidate S3StreamSetObject static caches on construction and sortAndEncode.
  • Defensively deduplicate S3ObjectMetadata inputs in CompactionManager and StreamObjectCompactor (group0 and compact0) with warning logs; add tests.

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated no comments.

Show a summary per file
File Description
metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java Calls the new finishSnapshot() on S3 streams, S3 objects, and KV deltas.
metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java Adds finishSnapshot() that marks unseen streams/nodes deleted and unseen stream end offsets as null; apply() treats null end offsets as removal.
metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java Adds finishSnapshot() for ranges/stream objects; rebuilds stream-object DeltaList keyed by objectId.
metadata/src/main/java/org/apache/kafka/image/NodeS3WALMetadataDelta.java Adds finishSnapshot(); rebuilds stream-set-object DeltaList keyed by objectId.
metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java Adds finishSnapshot() removing objects missing from snapshot replay.
metadata/src/main/java/org/apache/kafka/image/KVDelta.java Adds finishSnapshot() removing keys missing from snapshot replay.
metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java Invalidates static range caches on construction and in sortAndEncode.
s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java Defensive dedup of input metadata at multiple stages; exposes streamDataBlockMap() for tests.
s3stream/src/main/java/com/automq/stream/s3/compact/StreamObjectCompactor.java Defensive dedup in compact0 and group0.
metadata/src/test/java/org/apache/kafka/image/{NodeRuntimeMetadataImageTest,StreamRuntimeMetadataImageTest,S3ObjectsImageTest,KVImageTest}.java Regression tests for same-objectId replay and snapshot finalization.
s3stream/src/test/java/com/automq/stream/s3/compact/{CompactionManagerTest,StreamObjectCompactorTest}.java Regression tests for compaction input deduplication.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@Gezi-lzq Gezi-lzq force-pushed the codex/issue-3369-s3-metadata-snapshot-dedupe branch 2 times, most recently from d5d6099 to 6b753cb Compare May 27, 2026 09:50
@Gezi-lzq Gezi-lzq requested a review from Copilot May 28, 2026 07:27
@Gezi-lzq Gezi-lzq marked this pull request as ready for review May 28, 2026 07:27
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated no new comments.

if (addedS3StreamSetObjects.isEmpty() && removedS3StreamSetObjects.isEmpty()) {
streamSetObjects = image.getObjects();
} else if (snapshotReplay) {
Map<Long, S3StreamSetObject> objects = new LinkedHashMap<>();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe HashMap is enough

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the snapshot materialization map to HashMap. The final SSO read paths do not depend on insertion order: orderList() sorts by orderId, and direct object assertions sort by objectId in tests where needed.

if (changedS3StreamObjects.isEmpty() && removedS3StreamObjectIds.isEmpty()) {
newS3StreamObjects = image.streamObjects;
} else if (snapshotReplay) {
Map<Long, S3StreamObject> objects = new LinkedHashMap<>();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as above

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this snapshot materialization map to HashMap as well. S3StreamMetadataImage#getStreamObjects() sorts stream objects by startOffset, so map iteration order is not part of the semantics.

assertEquals(List.of(new S3StreamSetObject(0L, BROKER0, List.of(
new StreamOffsetRange(STREAM0, 0L, 100L)), 0L)), objects);
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Missing new scenarios
  • Missing verification for Delta

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expanded the snapshot tests to cover update + add + remove in one scenario, and added package-level delta-state assertions for the changed/removed sets. I also added similar delta-state checks for SO, S3 objects, and KV snapshot finalization so the same concern is covered across the touched deltas.

@Gezi-lzq Gezi-lzq force-pushed the codex/issue-3369-s3-metadata-snapshot-dedupe branch 2 times, most recently from 237aba4 to d2dbdd7 Compare May 28, 2026 13:53
@Gezi-lzq Gezi-lzq force-pushed the codex/issue-3369-s3-metadata-snapshot-dedupe branch from d2dbdd7 to d7a40cf Compare May 28, 2026 15:00
@Gezi-lzq Gezi-lzq enabled auto-merge (squash) May 29, 2026 03:14
@Gezi-lzq Gezi-lzq merged commit 8cce597 into main May 29, 2026
6 checks passed
@Gezi-lzq Gezi-lzq deleted the codex/issue-3369-s3-metadata-snapshot-dedupe branch May 29, 2026 03:35
Gezi-lzq added a commit that referenced this pull request May 29, 2026
## Summary

- Complete snapshot finalization for AutoMQ S3/KV metadata deltas so
snapshot replay removes stale metadata from non-empty images.
- In snapshot-finalization mode only, rebuild S3 stream-set-object and
stream-object `DeltaList` instances from the final objectId-keyed state.
Normal incremental replay keeps the existing `DeltaList.copy()`
append/remove path.
- Add defensive compaction input deduplication by `objectId` with
warning logs.
- Add regression coverage for same-`objectId` snapshot replay, snapshot
finalization, delta changed/removed sets, and compaction duplicate
inputs.

## Tests

- `./gradlew :metadata:test --tests
'org.apache.kafka.image.NodeRuntimeMetadataImageTest' --tests
'org.apache.kafka.image.StreamRuntimeMetadataImageTest' --tests
'org.apache.kafka.image.S3ObjectsImageTest' --tests
'org.apache.kafka.image.KVImageTest' --tests
'org.apache.kafka.image.S3StreamsMetadataImageTest'`
- `./gradlew :s3stream:test --tests
'com.automq.stream.s3.compact.CompactionManagerTest' --tests
'com.automq.stream.s3.compact.StreamObjectCompactorTest'`
- `git diff --check HEAD~1..HEAD`

## Related

- Cherry-pick of #3370 to 1.6
- 1.6 keeps the pre-`KVKey` String-keyed KV metadata API, so the KV
snapshot finalization backport uses the 1.6 `KVImage.kvs()` path.
Gezi-lzq added a commit that referenced this pull request May 29, 2026
## Summary

- Complete snapshot finalization for AutoMQ S3/KV metadata deltas so
snapshot replay removes stale metadata from non-empty images.
- In snapshot-finalization mode only, rebuild S3 stream-set-object and
stream-object `DeltaList` instances from the final objectId-keyed state.
Normal incremental replay keeps the existing `DeltaList.copy()`
append/remove path.
- Add defensive compaction input deduplication by `objectId` with
warning logs.
- Add regression coverage for same-`objectId` snapshot replay, snapshot
finalization, delta changed/removed sets, and compaction duplicate
inputs.

## Tests

- `./gradlew :metadata:test --tests
'org.apache.kafka.image.NodeRuntimeMetadataImageTest' --tests
'org.apache.kafka.image.StreamRuntimeMetadataImageTest' --tests
'org.apache.kafka.image.S3ObjectsImageTest' --tests
'org.apache.kafka.image.KVImageTest' --tests
'org.apache.kafka.image.S3StreamsMetadataImageTest'`
- `./gradlew :s3stream:test --tests
'com.automq.stream.s3.compact.CompactionManagerTest' --tests
'com.automq.stream.s3.compact.StreamObjectCompactorTest'`
- `git diff --check HEAD~1..HEAD`

## Related

- Cherry-pick of #3370 to 1.7
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Fix duplicate S3 stream objects after snapshot replay on non-empty metadata image

3 participants