Skip to content

Commit 64a9262

Browse files
therepaniconobc
authored andcommitted
Allow dead letter topics to be non-fully-qualified.
The consumer factories now respect the configured PulsarTopicBuilder and will fully-qualify the `deadLetterTopic` and `retryLetterTopic` on the `DeadLetterPolicy`. This allows users to not have to fully-qualify these topic names. Resolves #1241 Signed-off-by: Andrey Litvitski <andrey1010102008@gmail.com>
1 parent 79f9b74 commit 64a9262

File tree

2 files changed

+26
-0
lines changed

2 files changed

+26
-0
lines changed

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,18 @@ protected void ensureTopicNamesFullyQualified(ReactiveMessageConsumerBuilder<T>
101101
var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList();
102102
mutableSpec.setTopicNames(fullyQualifiedTopics);
103103
}
104+
if (mutableSpec.getDeadLetterPolicy() != null) {
105+
var dlt = mutableSpec.getDeadLetterPolicy().getDeadLetterTopic();
106+
if (dlt != null) {
107+
mutableSpec.getDeadLetterPolicy()
108+
.setDeadLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(dlt));
109+
}
110+
var rlt = mutableSpec.getDeadLetterPolicy().getRetryLetterTopic();
111+
if (rlt != null) {
112+
mutableSpec.getDeadLetterPolicy()
113+
.setRetryLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(rlt));
114+
}
115+
}
104116
}
105117

106118
protected void ensureTopicsPatternFullyQualified(ReactiveMessageConsumerBuilder<T> consumerBuilder) {

spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,20 @@ protected void ensureTopicNamesFullyQualified(ConsumerBuilder<T> builder) {
149149
var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList();
150150
builderImpl.getConf().setTopicNames(new HashSet<>(fullyQualifiedTopics));
151151
}
152+
if (builderImpl.getConf().getDeadLetterPolicy() != null) {
153+
var dlt = builderImpl.getConf().getDeadLetterPolicy().getDeadLetterTopic();
154+
if (dlt != null) {
155+
builderImpl.getConf()
156+
.getDeadLetterPolicy()
157+
.setDeadLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(dlt));
158+
}
159+
var rlt = builderImpl.getConf().getDeadLetterPolicy().getRetryLetterTopic();
160+
if (rlt != null) {
161+
builderImpl.getConf()
162+
.getDeadLetterPolicy()
163+
.setRetryLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(rlt));
164+
}
165+
}
152166
}
153167

154168
protected void ensureTopicsPatternFullyQualified(ConsumerBuilder<T> builder) {

0 commit comments

Comments
 (0)