Skip to content

[fix][broker] Fix non-batched null-value messages not removed during topic compaction#25817

Open
grishaf wants to merge 2 commits into
apache:masterfrom
grishaf:fix/compaction-null-value-tombstone
Open

[fix][broker] Fix non-batched null-value messages not removed during topic compaction#25817
grishaf wants to merge 2 commits into
apache:masterfrom
grishaf:fix/compaction-null-value-tombstone

Conversation

@grishaf
Copy link
Copy Markdown

@grishaf grishaf commented May 19, 2026

Motivation

When a non-Java producer (C++, Python, Go) sends a null-value message (tombstone) on a compacted topic, the key is not removed during topic compaction. The tombstone is retained in the compacted view instead of being deleted.
Root cause:
AbstractTwoPhaseCompactor.extractKeyAndSize() computed payload size using headersAndPayload.readableBytes(), which returns the combined size of the serialized MessageMetadata + payload. For null-value messages the payload is empty, but the metadata is always present, so readableBytes() is always > 0 (e.g. 32 bytes of metadata). This prevents the tombstone path (size <= 0 → latestForKey.remove(key)) from being reached in phase one of compaction.
The batch code path is not affected because it extracts per-message payload sizes from SingleMessageMetadata.payloadSize, which correctly returns 0 for null-value messages. The Java client always sets numMessagesInBatch in the metadata (even with enableBatching(false)), so all Java-produced messages go through the batch path — which is why this bug was never caught by existing tests.

Modifications

  • AbstractTwoPhaseCompactor.extractKeyAndSize(): Fixed to compute the correct payload-only size by using Commands.skipMessageMetadata() to skip past the metadata before reading readableBytes(). For compressed messages, getUncompressedSize() was already correct.
  • EventTimeOrderCompactor.extractMessageCompactionData(): Refactored to reuse extractKeyAndSize() instead of duplicating the (buggy) size calculation.

Verifying this change

This change added tests and can be verified as follows:

  • Added CompactionTest.testNonBatchedMessageWithNullValue — writes raw non-batch entries (no numMessagesInBatch in metadata, simulating C++/Python producers) directly to the managed ledger, triggers compaction, and verifies tombstoned keys are removed from the compacted view.
    | Test | Without fix | With fix |
    |-------------------------------------|------------------------------------|----------|
    | testNonBatchedMessageWithNullValue | FAILED: expected [2] but found [4] | PASSED |

Does this pull request potentially affect one of the following parts:

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

@grishaf grishaf force-pushed the fix/compaction-null-value-tombstone branch from 74d5fb6 to d34b02d Compare May 19, 2026 12:05
…topic compaction

When a non-Java producer (C++, Python, Go) sends a null-value message
(tombstone) on a compacted topic, the key is not removed during topic
compaction. The tombstone is retained in the compacted view instead of
being deleted.

Root cause:
`AbstractTwoPhaseCompactor.extractKeyAndSize()` computes payload size
using `headersAndPayload.readableBytes()`, which returns the combined
size of the serialized MessageMetadata + payload. For null-value
messages the payload is empty, but the metadata is always present,
so readableBytes() is always > 0 (e.g. 32 bytes of metadata). This
prevents the tombstone path (`size <= 0 -> latestForKey.remove(key)`)
from being reached in phase one of compaction.

The batch code path is not affected because it extracts per-message
payload sizes from `SingleMessageMetadata.payloadSize`, which correctly
returns 0 for null-value messages. The Java client always sets
`numMessagesInBatch` in the metadata (even with `enableBatching(false)`),
so all Java-produced messages go through the batch path -- which is why
this bug was never caught by existing tests.

Fix:
Check `msgMetadata.hasNullValue() && msgMetadata.isNullValue()` in
`extractKeyAndSize()` and return size 0, so the compaction phase one
correctly removes the key from `latestForKey`.

Also refactored `EventTimeOrderCompactor.extractMessageCompactionData()`
to reuse `extractKeyAndSize()` instead of duplicating the size logic.

Verification (testNonBatchedMessageWithNullValue):
Writes raw non-batch entries (no numMessagesInBatch) to the managed
ledger, triggers compaction, and reads the compacted view.

| Test                                | Without fix                        | With fix |
|-------------------------------------|------------------------------------|----------|
| testNonBatchedMessageWithNullValue  | FAILED: expected [2] but found [4] | PASSED   |

Co-authored-by: Cursor <cursoragent@cursor.com>
@grishaf grishaf force-pushed the fix/compaction-null-value-tombstone branch from d34b02d to 77916ab Compare May 19, 2026 12:10
@grishaf
Copy link
Copy Markdown
Author

grishaf commented May 19, 2026

PR failed on a known flaky test
#25815

@grishaf grishaf requested a review from lhotari May 24, 2026 06:49
@lhotari
Copy link
Copy Markdown
Member

lhotari commented May 25, 2026

PR failed on a known flaky test #25815

@grishaf btw. in case of flaky tests, you can trigger a retry for the CI build by adding a comment /pulsarbot rerun to the PR. This works only after the CI workflow jobs have all finished.

@lhotari
Copy link
Copy Markdown
Member

lhotari commented May 25, 2026

/pulsarbot rerun

@lhotari
Copy link
Copy Markdown
Member

lhotari commented May 25, 2026

@grishaf Please avoid force pushes to a PR where the review has already started since it makes it harder to track changes after the previous review. Instead, merge the latest origin/master to your PR branch.

@lhotari
Copy link
Copy Markdown
Member

lhotari commented May 25, 2026

I had to trigger a full rebuild since "Pulsar CI" currently caches build artifacts for 3 days and if the artifacts expire, there's a need to trigger a full rebuild. Merging origin/master to the PR branch is a recommended approach. Another way to trigger is to push a commit or close and re-open the PR (however that's not a recommended approach for retrying after flaky tests, /pulsarbot rerun should be used instead unless artifacts have expired).

Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the contribution @grishaf

} else {
ByteBuf headersAndPayload = m.getHeadersAndPayload().duplicate();
Commands.skipMessageMetadata(headersAndPayload);
payloadSize = headersAndPayload.readableBytes();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check msgMetadata.hasNullValue() && msgMetadata.isNullValue() before deriving the size from payload bytes?

The new payload-only calculation fixes plain non-batch tombstones, but it can still keep encrypted tombstones when the producer does not set uncompressedSize. Encryption can turn an empty/null payload into non-empty ciphertext, so after skipMessageMetadata(...), readableBytes() may still be > 0 and phase one will put the tombstone into latestForKey instead of removing the key.

Java producers usually avoid this because ProducerImpl#updateMessageMetadata sets uncompressedSize=0 for null payloads, but this PR is specifically fixing producers whose metadata does not include that size. The explicit nullValue flag is the protocol-level tombstone signal, so I think this helper should return size 0 for nullValue=true before falling back to uncompressedSize or payload bytes.

if (msgMetadata.hasNullValue() && msgMetadata.isNullValue()) {
    payloadSize = 0;
} else if (msgMetadata.hasUncompressedSize()) {
    payloadSize = msgMetadata.getUncompressedSize();
} else {
    ByteBuf headersAndPayload = m.getHeadersAndPayload().duplicate();
    Commands.skipMessageMetadata(headersAndPayload);
    payloadSize = headersAndPayload.readableBytes();
}

Comment on lines +644 to +646
* Write raw non-batch entries directly to the managed ledger, simulating
* messages from C++/Python clients that do not set numMessagesInBatch.
* Verifies that null-value tombstones remove keys during compaction.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small wording nit: this comment says the raw entries simulate C++/Python clients because they do not set numMessagesInBatch. The Java non-batch send path also does not set MessageMetadata.numMessagesInBatch; Java avoids the old bug mainly because ProducerImpl#updateMessageMetadata sets uncompressedSize=0 for null payloads.

Could we reword this as "raw non-batch entries without uncompressedSize, as seen with some non-Java clients" so future readers don't infer that numMessagesInBatch is what protects Java non-batch messages?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants