From 2dc14d4b3e8b56351788bf71bbf2c7d7d1ce812e Mon Sep 17 00:00:00 2001 From: Cong Zhao Date: Mon, 14 Aug 2023 20:33:29 +0800 Subject: [PATCH] [fix][broker] Fix message loss during topic compaction (#20980) (cherry picked from commit 3ab420cd81c31ebd16213e14580d9e317bc0698d) --- .../pulsar/client/impl/RawBatchConverter.java | 8 ++- .../pulsar/compaction/TwoPhaseCompactor.java | 22 +++++--- .../pulsar/compaction/CompactionTest.java | 56 +++++++++++++++++++ 3 files changed, 75 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java index 54d2ff867a629..5f476df68af4d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java @@ -44,6 +44,10 @@ public class RawBatchConverter { public static boolean isReadableBatch(RawMessage msg) { ByteBuf payload = msg.getHeadersAndPayload(); MessageMetadata metadata = Commands.parseMessageMetadata(payload); + return isReadableBatch(metadata); + } + + public static boolean isReadableBatch(MessageMetadata metadata) { return metadata.hasNumMessagesInBatch() && metadata.getEncryptionKeysCount() == 0; } @@ -71,9 +75,9 @@ public static List> extractIdsAndKey msg.getMessageIdData().getEntryId(), msg.getMessageIdData().getPartition(), i); - if (!smm.isCompactedOut()) { + if (!smm.isCompactedOut() && smm.hasPartitionKey()) { idsAndKeysAndSize.add(ImmutableTriple.of(id, - smm.hasPartitionKey() ? smm.getPartitionKey() : null, + smm.getPartitionKey(), smm.hasPayloadSize() ? smm.getPayloadSize() : 0)); } singleMessagePayload.release(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 7d3b5863cb6f6..29225253da197 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -122,26 +122,32 @@ private void phaseOneLoop(RawReader reader, () -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)")); future.thenAcceptAsync(m -> { - try { + try (m) { MessageId id = m.getMessageId(); boolean deletedMessage = false; boolean replaceMessage = false; mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes()); - if (RawBatchConverter.isReadableBatch(m)) { + MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload()); + if (RawBatchConverter.isReadableBatch(metadata)) { try { + int numMessagesInBatch = metadata.getNumMessagesInBatch(); + int deleteCnt = 0; for (ImmutableTriple e : extractIdsAndKeysAndSizeFromBatch(m)) { if (e != null) { if (e.getRight() > 0) { MessageId old = latestForKey.put(e.getMiddle(), e.getLeft()); - replaceMessage = old != null; + if (old != null) { + mxBean.addCompactionRemovedEvent(reader.getTopic()); + } } else { - deletedMessage = true; latestForKey.remove(e.getMiddle()); + deleteCnt++; + mxBean.addCompactionRemovedEvent(reader.getTopic()); } } - if (replaceMessage || deletedMessage) { - mxBean.addCompactionRemovedEvent(reader.getTopic()); - } + } + if (deleteCnt == numMessagesInBatch) { + deletedMessage = true; } } catch (IOException ioe) { log.info("Error decoding batch for message {}. Whole batch will be included in output", @@ -174,8 +180,6 @@ private void phaseOneLoop(RawReader reader, lastMessageId, latestForKey, loopPromise); } - } finally { - m.close(); } }, scheduler).exceptionally(ex -> { loopPromise.completeExceptionally(ex); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index c8105b011254b..c5dbd9c49aac9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.api.OpenBuilder; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -96,6 +97,7 @@ import org.testng.annotations.Test; @Test(groups = "broker-impl") +@Slf4j public class CompactionTest extends MockedPulsarServiceBaseTest { protected ScheduledExecutorService compactionScheduler; protected BookKeeper bk; @@ -553,6 +555,60 @@ public void testBatchMessageIdsDontChange() throws Exception { } } + @Test + public void testBatchMessageWithNullValue() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .receiverQueueSize(1).readCompacted(true).subscribe().close(); + + try (Producer producer = pulsarClient.newProducer().topic(topic) + .maxPendingMessages(3) + .enableBatching(true) + .batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create() + ) { + // batch 1 + producer.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync(); + producer.newMessage().key("key1").value(null).sendAsync(); + producer.newMessage().key("key2").value("my-message-3".getBytes()).send(); + + // batch 2 + producer.newMessage().key("key3").value("my-message-4".getBytes()).sendAsync(); + producer.newMessage().key("key3").value("my-message-5".getBytes()).sendAsync(); + producer.newMessage().key("key3").value("my-message-6".getBytes()).send(); + + // batch 3 + producer.newMessage().key("key4").value("my-message-7".getBytes()).sendAsync(); + producer.newMessage().key("key4").value(null).sendAsync(); + producer.newMessage().key("key5").value("my-message-9".getBytes()).send(); + } + + + // compact the topic + compact(topic); + + // Read messages before compaction to get ids + List> messages = new ArrayList<>(); + try (Consumer consumer = pulsarClient.newConsumer().topic(topic) + .subscriptionName("sub1").receiverQueueSize(1).readCompacted(true).subscribe()) { + while (true) { + Message message = consumer.receive(5, TimeUnit.SECONDS); + if (message == null) { + break; + } + messages.add(message); + } + } + + assertEquals(messages.size(), 3); + assertEquals(messages.get(0).getKey(), "key2"); + assertEquals(messages.get(1).getKey(), "key3"); + assertEquals(messages.get(2).getKey(), "key5"); + } + @Test public void testWholeBatchCompactedOut() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1";