[fix][broker] Fix non-batched null-value messages not removed during topic compaction#25817
[fix][broker] Fix non-batched null-value messages not removed during topic compaction#25817grishaf wants to merge 2 commits into
Conversation
74d5fb6 to
d34b02d
Compare
…topic compaction When a non-Java producer (C++, Python, Go) sends a null-value message (tombstone) on a compacted topic, the key is not removed during topic compaction. The tombstone is retained in the compacted view instead of being deleted. Root cause: `AbstractTwoPhaseCompactor.extractKeyAndSize()` computes payload size using `headersAndPayload.readableBytes()`, which returns the combined size of the serialized MessageMetadata + payload. For null-value messages the payload is empty, but the metadata is always present, so readableBytes() is always > 0 (e.g. 32 bytes of metadata). This prevents the tombstone path (`size <= 0 -> latestForKey.remove(key)`) from being reached in phase one of compaction. The batch code path is not affected because it extracts per-message payload sizes from `SingleMessageMetadata.payloadSize`, which correctly returns 0 for null-value messages. The Java client always sets `numMessagesInBatch` in the metadata (even with `enableBatching(false)`), so all Java-produced messages go through the batch path -- which is why this bug was never caught by existing tests. Fix: Check `msgMetadata.hasNullValue() && msgMetadata.isNullValue()` in `extractKeyAndSize()` and return size 0, so the compaction phase one correctly removes the key from `latestForKey`. Also refactored `EventTimeOrderCompactor.extractMessageCompactionData()` to reuse `extractKeyAndSize()` instead of duplicating the size logic. Verification (testNonBatchedMessageWithNullValue): Writes raw non-batch entries (no numMessagesInBatch) to the managed ledger, triggers compaction, and reads the compacted view. | Test | Without fix | With fix | |-------------------------------------|------------------------------------|----------| | testNonBatchedMessageWithNullValue | FAILED: expected [2] but found [4] | PASSED | Co-authored-by: Cursor <cursoragent@cursor.com>
d34b02d to
77916ab
Compare
|
PR failed on a known flaky test |
|
/pulsarbot rerun |
|
@grishaf Please avoid force pushes to a PR where the review has already started since it makes it harder to track changes after the previous review. Instead, merge the latest |
|
I had to trigger a full rebuild since "Pulsar CI" currently caches build artifacts for 3 days and if the artifacts expire, there's a need to trigger a full rebuild. Merging origin/master to the PR branch is a recommended approach. Another way to trigger is to push a commit or close and re-open the PR (however that's not a recommended approach for retrying after flaky tests, |
| } else { | ||
| ByteBuf headersAndPayload = m.getHeadersAndPayload().duplicate(); | ||
| Commands.skipMessageMetadata(headersAndPayload); | ||
| payloadSize = headersAndPayload.readableBytes(); |
There was a problem hiding this comment.
Should we check msgMetadata.hasNullValue() && msgMetadata.isNullValue() before deriving the size from payload bytes?
The new payload-only calculation fixes plain non-batch tombstones, but it can still keep encrypted tombstones when the producer does not set uncompressedSize. Encryption can turn an empty/null payload into non-empty ciphertext, so after skipMessageMetadata(...), readableBytes() may still be > 0 and phase one will put the tombstone into latestForKey instead of removing the key.
Java producers usually avoid this because ProducerImpl#updateMessageMetadata sets uncompressedSize=0 for null payloads, but this PR is specifically fixing producers whose metadata does not include that size. The explicit nullValue flag is the protocol-level tombstone signal, so I think this helper should return size 0 for nullValue=true before falling back to uncompressedSize or payload bytes.
if (msgMetadata.hasNullValue() && msgMetadata.isNullValue()) {
payloadSize = 0;
} else if (msgMetadata.hasUncompressedSize()) {
payloadSize = msgMetadata.getUncompressedSize();
} else {
ByteBuf headersAndPayload = m.getHeadersAndPayload().duplicate();
Commands.skipMessageMetadata(headersAndPayload);
payloadSize = headersAndPayload.readableBytes();
}| * Write raw non-batch entries directly to the managed ledger, simulating | ||
| * messages from C++/Python clients that do not set numMessagesInBatch. | ||
| * Verifies that null-value tombstones remove keys during compaction. |
There was a problem hiding this comment.
Small wording nit: this comment says the raw entries simulate C++/Python clients because they do not set numMessagesInBatch. The Java non-batch send path also does not set MessageMetadata.numMessagesInBatch; Java avoids the old bug mainly because ProducerImpl#updateMessageMetadata sets uncompressedSize=0 for null payloads.
Could we reword this as "raw non-batch entries without uncompressedSize, as seen with some non-Java clients" so future readers don't infer that numMessagesInBatch is what protects Java non-batch messages?
Motivation
When a non-Java producer (C++, Python, Go) sends a null-value message (tombstone) on a compacted topic, the key is not removed during topic compaction. The tombstone is retained in the compacted view instead of being deleted.
Root cause:
AbstractTwoPhaseCompactor.extractKeyAndSize()computed payload size usingheadersAndPayload.readableBytes(), which returns the combined size of the serialized MessageMetadata + payload. For null-value messages the payload is empty, but the metadata is always present, soreadableBytes()is always > 0 (e.g. 32 bytes of metadata). This prevents the tombstone path (size <= 0 → latestForKey.remove(key)) from being reached in phase one of compaction.The batch code path is not affected because it extracts per-message payload sizes from
SingleMessageMetadata.payloadSize, which correctly returns 0 for null-value messages. The Java client always setsnumMessagesInBatchin the metadata (even withenableBatching(false)), so all Java-produced messages go through the batch path — which is why this bug was never caught by existing tests.Modifications
AbstractTwoPhaseCompactor.extractKeyAndSize(): Fixed to compute the correct payload-only size by usingCommands.skipMessageMetadata()to skip past the metadata before readingreadableBytes(). For compressed messages,getUncompressedSize()was already correct.EventTimeOrderCompactor.extractMessageCompactionData(): Refactored to reuseextractKeyAndSize()instead of duplicating the (buggy) size calculation.Verifying this change
This change added tests and can be verified as follows:
CompactionTest.testNonBatchedMessageWithNullValue— writes raw non-batch entries (nonumMessagesInBatchin metadata, simulating C++/Python producers) directly to the managed ledger, triggers compaction, and verifies tombstoned keys are removed from the compacted view.| Test | Without fix | With fix |
|-------------------------------------|------------------------------------|----------|
| testNonBatchedMessageWithNullValue | FAILED: expected [2] but found [4] | PASSED |
Does this pull request potentially affect one of the following parts: