Skip to content

Commit c5cc1f9

Browse files
committed
Polish "Allow dead letter topics to be non-fully-qualified"
Adds to the previous commit by... - adding a test for the fully qualified dead/retry letter topics - de-duplicate the logic for the retry and dead letter topic Signed-off-by: onobc <chris.bono@gmail.com>
1 parent 64a9262 commit c5cc1f9

File tree

4 files changed

+95
-25
lines changed

4 files changed

+95
-25
lines changed

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.Collections;
2020
import java.util.List;
21+
import java.util.function.Supplier;
2122
import java.util.regex.Pattern;
2223

2324
import org.apache.pulsar.client.api.Schema;
@@ -27,6 +28,7 @@
2728
import org.jspecify.annotations.Nullable;
2829

2930
import org.springframework.pulsar.core.PulsarTopicBuilder;
31+
import org.springframework.util.Assert;
3032
import org.springframework.util.CollectionUtils;
3133
import org.springframework.util.StringUtils;
3234

@@ -101,17 +103,23 @@ protected void ensureTopicNamesFullyQualified(ReactiveMessageConsumerBuilder<T>
101103
var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList();
102104
mutableSpec.setTopicNames(fullyQualifiedTopics);
103105
}
106+
104107
if (mutableSpec.getDeadLetterPolicy() != null) {
105-
var dlt = mutableSpec.getDeadLetterPolicy().getDeadLetterTopic();
106-
if (dlt != null) {
107-
mutableSpec.getDeadLetterPolicy()
108-
.setDeadLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(dlt));
109-
}
110-
var rlt = mutableSpec.getDeadLetterPolicy().getRetryLetterTopic();
111-
if (rlt != null) {
112-
mutableSpec.getDeadLetterPolicy()
113-
.setRetryLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(rlt));
114-
}
108+
var deadLetterPolicy = mutableSpec.getDeadLetterPolicy();
109+
fullyQualifyDeadLetterPolicyTopic(deadLetterPolicy::getDeadLetterTopic,
110+
deadLetterPolicy::setDeadLetterTopic);
111+
fullyQualifyDeadLetterPolicyTopic(deadLetterPolicy::getRetryLetterTopic,
112+
deadLetterPolicy::setRetryLetterTopic);
113+
}
114+
}
115+
116+
protected void fullyQualifyDeadLetterPolicyTopic(Supplier<String> topicGetter,
117+
java.util.function.Consumer<String> topicSetter) {
118+
Assert.notNull(this.topicBuilder, "topicBuilder must not be null");
119+
var topicName = topicGetter.get();
120+
if (StringUtils.hasText(topicName)) {
121+
var fqTopicName = this.topicBuilder.getFullyQualifiedNameForTopic(topicName);
122+
topicSetter.accept(fqTopicName);
115123
}
116124
}
117125

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import java.util.List;
2525
import java.util.regex.Pattern;
2626

27+
import org.apache.pulsar.client.api.DeadLetterPolicy;
2728
import org.apache.pulsar.client.api.PulsarClient;
29+
import org.apache.pulsar.client.api.PulsarClientException;
2830
import org.apache.pulsar.client.api.Schema;
2931
import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
3032
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
@@ -145,6 +147,33 @@ void createConsumerEnsureTopicsPatternFullyQualified() {
145147
verify(topicBuilder).getFullyQualifiedNameForTopic(inputTopicsPattern);
146148
}
147149

150+
@Test
151+
void createConsumerEnsureDeadLetterPolicyTopicsFullyQualified() throws PulsarClientException {
152+
var topicBuilder = spy(new PulsarTopicBuilder());
153+
var consumerFactory = new DefaultReactivePulsarConsumerFactory<String>(
154+
AdaptedReactivePulsarClientFactory.create((PulsarClient) null), null);
155+
consumerFactory.setTopicBuilder(topicBuilder);
156+
var deadLetterTopic = "with-topic-builder-reactive-ensure-dlp-dlt-fq";
157+
var retryLetterTopic = "%s-retry".formatted(deadLetterTopic);
158+
var deadLetterPolicy = DeadLetterPolicy.builder()
159+
.maxRedeliverCount(2)
160+
.deadLetterTopic(deadLetterTopic)
161+
.retryLetterTopic(retryLetterTopic)
162+
.build();
163+
var consumer = consumerFactory.createConsumer(SCHEMA,
164+
Collections.singletonList(builder -> builder.deadLetterPolicy(deadLetterPolicy)));
165+
var reactiveMessageConsumerSpec = assertThat(consumer)
166+
.extracting("consumerSpec", InstanceOfAssertFactories.type(ReactiveMessageConsumerSpec.class))
167+
.actual();
168+
169+
assertThat(reactiveMessageConsumerSpec.getDeadLetterPolicy().getDeadLetterTopic())
170+
.isEqualTo("persistent://public/default/%s".formatted(deadLetterTopic));
171+
assertThat(reactiveMessageConsumerSpec.getDeadLetterPolicy().getRetryLetterTopic())
172+
.isEqualTo("persistent://public/default/%s".formatted(retryLetterTopic));
173+
verify(topicBuilder).getFullyQualifiedNameForTopic(deadLetterTopic);
174+
verify(topicBuilder).getFullyQualifiedNameForTopic(retryLetterTopic);
175+
}
176+
148177
}
149178

150179
}

spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Map;
2424
import java.util.Objects;
2525
import java.util.TreeMap;
26+
import java.util.function.Supplier;
2627
import java.util.regex.Pattern;
2728

2829
import org.apache.pulsar.client.api.Consumer;
@@ -34,6 +35,7 @@
3435
import org.jspecify.annotations.Nullable;
3536

3637
import org.springframework.pulsar.PulsarException;
38+
import org.springframework.util.Assert;
3739
import org.springframework.util.CollectionUtils;
3840
import org.springframework.util.StringUtils;
3941

@@ -149,19 +151,23 @@ protected void ensureTopicNamesFullyQualified(ConsumerBuilder<T> builder) {
149151
var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList();
150152
builderImpl.getConf().setTopicNames(new HashSet<>(fullyQualifiedTopics));
151153
}
154+
152155
if (builderImpl.getConf().getDeadLetterPolicy() != null) {
153-
var dlt = builderImpl.getConf().getDeadLetterPolicy().getDeadLetterTopic();
154-
if (dlt != null) {
155-
builderImpl.getConf()
156-
.getDeadLetterPolicy()
157-
.setDeadLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(dlt));
158-
}
159-
var rlt = builderImpl.getConf().getDeadLetterPolicy().getRetryLetterTopic();
160-
if (rlt != null) {
161-
builderImpl.getConf()
162-
.getDeadLetterPolicy()
163-
.setRetryLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(rlt));
164-
}
156+
var deadLetterPolicy = builderImpl.getConf().getDeadLetterPolicy();
157+
fullyQualifyDeadLetterPolicyTopic(deadLetterPolicy::getDeadLetterTopic,
158+
deadLetterPolicy::setDeadLetterTopic);
159+
fullyQualifyDeadLetterPolicyTopic(deadLetterPolicy::getRetryLetterTopic,
160+
deadLetterPolicy::setRetryLetterTopic);
161+
}
162+
}
163+
164+
protected void fullyQualifyDeadLetterPolicyTopic(Supplier<String> topicGetter,
165+
java.util.function.Consumer<String> topicSetter) {
166+
Assert.notNull(this.topicBuilder, "topicBuilder must not be null");
167+
var topicName = topicGetter.get();
168+
if (StringUtils.hasText(topicName)) {
169+
var fqTopicName = this.topicBuilder.getFullyQualifiedNameForTopic(topicName);
170+
topicSetter.accept(fqTopicName);
165171
}
166172
}
167173

spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import org.apache.pulsar.client.api.Consumer;
3333
import org.apache.pulsar.client.api.ConsumerBuilder;
34+
import org.apache.pulsar.client.api.DeadLetterPolicy;
3435
import org.apache.pulsar.client.api.PulsarClient;
3536
import org.apache.pulsar.client.api.PulsarClientException;
3637
import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
@@ -273,10 +274,10 @@ void multipleConfigCustomizers() throws PulsarClientException {
273274
}
274275

275276
@Nested
276-
class CreateConsumerUsingPulsarTopicBuilder {
277+
class CreateConsumerWithTopicBuilder {
277278

278279
@Test
279-
void withPulsarTopicBuilderEnsureTopicNamesFullyQualified() throws PulsarClientException {
280+
void ensureTopicNamesFullyQualified() throws PulsarClientException {
280281
var pulsarTopicBuilder = spy(new PulsarTopicBuilder());
281282
DefaultPulsarConsumerFactory<String> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
282283
null);
@@ -289,7 +290,7 @@ void withPulsarTopicBuilderEnsureTopicNamesFullyQualified() throws PulsarClientE
289290
}
290291

291292
@Test
292-
void withPulsarTopicBuilderEnsureTopicsPatternFullyQualified() throws PulsarClientException {
293+
void ensureTopicsPatternFullyQualified() throws PulsarClientException {
293294
var pulsarTopicBuilder = spy(new PulsarTopicBuilder());
294295
ConsumerBuilderCustomizer<String> customizer = (builder) -> builder.topicsPattern("topic-.*");
295296
DefaultPulsarConsumerFactory<String> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
@@ -307,6 +308,32 @@ void withPulsarTopicBuilderEnsureTopicsPatternFullyQualified() throws PulsarClie
307308
}
308309
}
309310

311+
@Test
312+
void ensureDeadLetterPolicyTopicsFullyQualified() throws PulsarClientException {
313+
var pulsarTopicBuilder = spy(new PulsarTopicBuilder());
314+
var deadLetterTopic = "with-pulsar-topic-builder-ensure-dlp-dlt-fq";
315+
var retryLetterTopic = "%s-retry".formatted(deadLetterTopic);
316+
var deadLetterPolicy = DeadLetterPolicy.builder()
317+
.maxRedeliverCount(2)
318+
.deadLetterTopic(deadLetterTopic)
319+
.retryLetterTopic(retryLetterTopic)
320+
.build();
321+
ConsumerBuilderCustomizer<String> customizer = (builder) -> builder.deadLetterPolicy(deadLetterPolicy);
322+
DefaultPulsarConsumerFactory<String> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
323+
List.of(customizer));
324+
consumerFactory.setTopicBuilder(pulsarTopicBuilder);
325+
326+
try (Consumer<String> consumer = consumerFactory.createConsumer(SCHEMA, Collections.singletonList("topic1"),
327+
"%s-sub".formatted(deadLetterTopic), null, null)) {
328+
assertThat(consumer).extracting("deadLetterPolicy.deadLetterTopic")
329+
.isEqualTo("persistent://public/default/%s".formatted(deadLetterTopic));
330+
assertThat(consumer).extracting("deadLetterPolicy.retryLetterTopic")
331+
.isEqualTo("persistent://public/default/%s".formatted(retryLetterTopic));
332+
verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic(deadLetterTopic);
333+
verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic(retryLetterTopic);
334+
}
335+
}
336+
310337
// TODO remove when Pulsar client updates to 4.2.0
311338
private void temporarilyDealWithPulsar24698(PatternMultiTopicsConsumerImpl<String> consumer) {
312339
// See https://github.com/apache/pulsar/pull/24698

0 commit comments

Comments
 (0)