diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index 5e3731fbf24fc..2a0cb3187d208 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -46,9 +47,10 @@ import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -@Test(groups = "flaky") +@Test(groups = "broker-impl") public class DeadLetterTopicTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(DeadLetterTopicTest.class); @@ -56,6 +58,7 @@ public class DeadLetterTopicTest extends ProducerConsumerBase { @BeforeMethod(alwaysRun = true) @Override protected void setup() throws Exception { + this.conf.setMaxMessageSize(5 * 1024); super.internalSetup(); super.producerBaseSetup(); } @@ -66,6 +69,15 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + private String createMessagePayload(int size) { + StringBuilder str = new StringBuilder(); + Random rand = new Random(); + for (int i = 0; i < size; i++) { + str.append(rand.nextInt(10)); + } + return str.toString(); + } + @Test public void testDeadLetterTopicWithMessageKey() throws Exception { final String topic = "persistent://my-property/my-ns/dead-letter-topic"; @@ -125,9 +137,13 @@ public void testDeadLetterTopicWithMessageKey() throws Exception { consumer.close(); } + @DataProvider(name = "produceLargeMessages") + public Object[][] produceLargeMessages() { + return new Object[][] { { false }, { true } }; + } - @Test(groups = "quarantine") - public void testDeadLetterTopic() throws Exception { + @Test(dataProvider = "produceLargeMessages") + public void testDeadLetterTopic(boolean produceLargeMessages) throws Exception { final String topic = "persistent://my-property/my-ns/dead-letter-topic"; final int maxRedeliveryCount = 2; @@ -154,28 +170,44 @@ public void testDeadLetterTopic() throws Exception { Producer producer = pulsarClient.newProducer(Schema.BYTES) .topic(topic) + .enableChunking(produceLargeMessages) + .enableBatching(!produceLargeMessages) .create(); + Map messageContent = new HashMap<>(); + for (int i = 0; i < sendMessages; i++) { - producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); + String data; + if (!produceLargeMessages) { + data = String.format("Hello Pulsar [%d]", i); + } else { + data = createMessagePayload(1024 * 10); + } + producer.newMessage().key(String.valueOf(i)).value(data.getBytes()).send(); + messageContent.put(i, data); } producer.close(); int totalReceived = 0; do { - Message message = consumer.receive(); - log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + Message message = consumer.receive(5, TimeUnit.SECONDS); + assertNotNull(message, "The consumer should be able to receive messages."); + log.info("consumer received message : {}", message.getMessageId()); totalReceived++; } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); int totalInDeadLetter = 0; do { - Message message = deadLetterConsumer.receive(); - log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + Message message = deadLetterConsumer.receive(5, TimeUnit.SECONDS); + assertNotNull(message, "the deadLetterConsumer should receive messages."); + assertEquals(new String(message.getData()), messageContent.get(Integer.parseInt(message.getKey()))); + messageContent.remove(Integer.parseInt(message.getKey())); + log.info("dead letter consumer received message : {}", message.getMessageId()); deadLetterConsumer.acknowledge(message); totalInDeadLetter++; } while (totalInDeadLetter < sendMessages); + assertTrue(messageContent.isEmpty()); deadLetterConsumer.close(); consumer.close(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 6a3b894364a36..52f90a7a0d087 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -613,6 +613,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a retryLetterProducer = client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)) .topic(this.deadLetterPolicy.getRetryLetterTopic()) .enableBatching(false) + .enableChunking(true) .blockIfQueueFull(false) .create(); } @@ -1460,7 +1461,8 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null || msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1)) { // means we lost the first chunk: should never happen - log.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}", msgId, + log.info("[{}] [{}] Received unexpected chunk messageId {}, last-chunk-id = {}, chunkId = {}", topic, + subscription, msgId, (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId()); if (chunkedMsgCtx != null) { if (chunkedMsgCtx.chunkedMsgBuffer != null) { @@ -2104,6 +2106,8 @@ private void initDeadLetterProducerIfNeeded() { .initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName()) .topic(this.deadLetterPolicy.getDeadLetterTopic()) .blockIfQueueFull(false) + .enableBatching(false) + .enableChunking(true) .createAsync(); } } finally { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java index c8b18524ec052..a0d1446ba3d55 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java @@ -64,6 +64,9 @@ static boolean isBatch(MessageIdAdv msgId) { } static MessageIdAdv discardBatch(MessageId messageId) { + if (messageId instanceof ChunkMessageIdImpl) { + return (MessageIdAdv) messageId; + } MessageIdAdv msgId = (MessageIdAdv) messageId; return new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), msgId.getPartitionIndex()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 9bd1421d0355b..a17d4a06f02a6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -698,6 +698,12 @@ private void serializeAndSendMessage(MessageImpl msg, op = OpSendMsg.create(msg, null, sequenceId, callback); final MessageMetadata finalMsgMetadata = msgMetadata; op.rePopulate = () -> { + if (msgMetadata.hasChunkId()) { + // The message metadata is shared between all chunks in a large message + // We need to reset the chunk id for each call of this method + // It's safe to do that because there is only 1 thread to manipulate this message metadata + finalMsgMetadata.setChunkId(chunkId); + } op.cmd = sendMessage(producerId, sequenceId, numMessages, messageId, finalMsgMetadata, encryptedPayload); }; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java index 69f86a1a89f2c..20ec9c3d99af4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java @@ -138,8 +138,11 @@ public void run(Timeout t) throws Exception { if (!headPartition.isEmpty()) { log.info("[{}] {} messages will be re-delivered", consumerBase, headPartition.size()); headPartition.forEach(messageId -> { - addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase); - messageIds.add(messageId); + if (messageId instanceof ChunkMessageIdImpl) { + addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase); + } else { + messageIds.add(messageId); + } messageIdPartitionMap.remove(messageId); }); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java index 4ccc514e8e7f1..f6c668703d9db 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java @@ -29,12 +29,15 @@ import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; - +import java.time.Duration; import java.util.HashSet; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.awaitility.Awaitility; import org.testng.annotations.Test; public class UnAckedMessageTrackerTest { @@ -77,4 +80,48 @@ public void testAddAndRemove() { timer.stop(); } + @Test + public void testTrackChunkedMessageId() { + PulsarClientImpl client = mock(PulsarClientImpl.class); + Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), + 1, TimeUnit.MILLISECONDS); + when(client.timer()).thenReturn(timer); + + ConsumerBase consumer = mock(ConsumerBase.class); + doNothing().when(consumer).onAckTimeoutSend(any()); + doNothing().when(consumer).redeliverUnacknowledgedMessages(any()); + ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); + conf.setAckTimeoutMillis(1000); + conf.setTickDurationMillis(1000); + UnAckedMessageTracker tracker = new UnAckedMessageTracker(client, consumer, conf); + + assertTrue(tracker.isEmpty()); + assertEquals(tracker.size(), 0); + + // Build chunked message ID + MessageIdImpl[] chunkMsgIds = new MessageIdImpl[5]; + for (int i = 0; i < 5; i++) { + chunkMsgIds[i] = new MessageIdImpl(1L, i, -1); + } + ChunkMessageIdImpl chunkedMessageId = + new ChunkMessageIdImpl(chunkMsgIds[0], chunkMsgIds[chunkMsgIds.length - 1]); + + consumer.unAckedChunkedMessageIdSequenceMap = + ConcurrentOpenHashMap.newBuilder().build(); + consumer.unAckedChunkedMessageIdSequenceMap.put(chunkedMessageId, chunkMsgIds); + + // Redeliver chunked message + tracker.add(chunkedMessageId); + + Awaitility.await() + .pollInterval(Duration.ofMillis(200)) + .atMost(Duration.ofSeconds(3)) + .untilAsserted(() -> assertEquals(tracker.size(), 0)); + + // Assert that all chunk message ID are removed from unAckedChunkedMessageIdSequenceMap + assertEquals(consumer.unAckedChunkedMessageIdSequenceMap.size(), 0); + + timer.stop(); + } + }