From e1d838cc713f1806edc10a40bb02c9eee922ad24 Mon Sep 17 00:00:00 2001 From: Wenzhi Feng <52550727+thetumbled@users.noreply.github.com> Date: Tue, 18 Feb 2025 17:59:52 +0800 Subject: [PATCH] [fix][client] fix retry topic with exclusive mode. (#23859) (cherry picked from commit 5a59ab7768e11db6ed92dada78b398feca9e24fc) --- .../pulsar/client/api/RetryTopicTest.java | 40 +++++++++++++++++++ .../pulsar/client/impl/ConsumerBase.java | 8 ++++ 2 files changed, 48 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java index d0e72deb87fc2..2b897760b6f00 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java @@ -136,6 +136,46 @@ public void testRetryTopic() throws Exception { checkConsumer.close(); } + /** + * Retry topic feature relies on the delay queue feature when consumer produce a delayed message + * to the retry topic. The delay queue feature is only supported in shared and key-shared subscription type. + * As a result, the subscription type of the retry topic should be shared or key-shared. + * @throws Exception + */ + @Test + public void testRetryTopicWithExclusiveMode() throws Exception { + final String topic = "persistent://my-property/my-ns/retry-topic-exclusive"; + final int maxRedeliveryCount = 2; + + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Exclusive) + .enableRetry(true) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + producer.send("Hello Pulsar".getBytes()); + producer.close(); + + // receive message and set delay to 5 seconds + Message message = consumer.receive(); + long timestamp = System.currentTimeMillis(); + consumer.reconsumeLater(message, 4, TimeUnit.SECONDS); + + // receive message and check the delay is at least 4 seconds + consumer.receive(); + long delay = System.currentTimeMillis() - timestamp; + assertTrue(delay >= 2000); + consumer.close(); + } + @Data public static class Foo { @Nullable diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 1ad8c6d28f1d7..27e2216e58949 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerEventListener; +import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageIdAdv; @@ -792,6 +793,13 @@ private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) { } protected SubType getSubType() { + // For retry topic, we always use Shared subscription + // Because we will produce delayed messages to retry topic. + DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy(); + if (deadLetterPolicy != null && topic.equals(deadLetterPolicy.getRetryLetterTopic())) { + return SubType.Shared; + } + SubscriptionType type = conf.getSubscriptionType(); switch (type) { case Exclusive: