Skip to content

Commit a285a7c

Browse files
committed
Use type safe structures to configure the consumer
1 parent f466ce8 commit a285a7c

File tree

14 files changed

+514
-186
lines changed

14 files changed

+514
-186
lines changed

spring-pulsar-docs/src/main/asciidoc/pulsar.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -998,7 +998,7 @@ class DeadLetterPolicyConfig {
998998
999999
@PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
10001000
topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
1001-
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeoutMillis=1" })
1001+
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeoutMillis=1000" })
10021002
void listen(String msg) {
10031003
throw new RuntimeException("fail " + msg);
10041004
}

spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.pulsar.client.api.RegexSubscriptionMode;
3535
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
3636
import org.apache.pulsar.client.api.SubscriptionType;
37+
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
3738
import org.apache.pulsar.common.schema.SchemaType;
3839

3940
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -90,8 +91,8 @@ public Admin getAdministration() {
9091
return this.admin;
9192
}
9293

93-
public Map<String, Object> buildConsumerProperties() {
94-
return new HashMap<>(this.consumer.buildProperties());
94+
public ConsumerConfigurationData<?> buildConsumerProperties() {
95+
return this.consumer.buildProperties();
9596
}
9697

9798
public Map<String, Object> buildClientProperties() {
@@ -415,38 +416,38 @@ public void setExpireTimeOfIncompleteChunkedMessageMillis(long expireTimeOfIncom
415416
this.expireTimeOfIncompleteChunkedMessageMillis = expireTimeOfIncompleteChunkedMessageMillis;
416417
}
417418

418-
public Map<String, Object> buildProperties() {
419-
PulsarProperties.Properties properties = new Properties();
419+
public ConsumerConfigurationData<?> buildProperties() {
420+
ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
420421

421422
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
422423

423-
map.from(this::getTopics).as(Set::of).to(properties.in("topicNames"));
424-
map.from(this::getTopicsPattern).as(Pattern::compile).to(properties.in("topicsPattern"));
425-
map.from(this::getSubscriptionName).to(properties.in("subscriptionName"));
426-
map.from(this::getSubscriptionType).to(properties.in("subscriptionType"));
427-
map.from(this::getReceiverQueueSize).to(properties.in("receiverQueueSize"));
428-
map.from(this::getAcknowledgementsGroupTimeMicros).to(properties.in("acknowledgementsGroupTimeMicros"));
429-
map.from(this::getNegativeAckRedeliveryDelayMicros).to(properties.in("negativeAckRedeliveryDelayMicros"));
424+
map.from(this::getTopics).as(Set::of).to(conf::setTopicNames);
425+
map.from(this::getTopicsPattern).as(Pattern::compile).to(conf::setTopicsPattern);
426+
map.from(this::getSubscriptionName).to(conf::setSubscriptionName);
427+
map.from(this::getSubscriptionType).to(conf::setSubscriptionType);
428+
map.from(this::getReceiverQueueSize).to(conf::setReceiverQueueSize);
429+
map.from(this::getAcknowledgementsGroupTimeMicros).to(conf::setAcknowledgementsGroupTimeMicros);
430+
map.from(this::getNegativeAckRedeliveryDelayMicros).to(conf::setNegativeAckRedeliveryDelayMicros);
430431
map.from(this::getMaxTotalReceiverQueueSizeAcrossPartitions)
431-
.to(properties.in("maxTotalReceiverQueueSizeAcrossPartitions"));
432-
map.from(this::getConsumerName).to(properties.in("consumerName"));
433-
map.from(this::getAckTimeoutMillis).to(properties.in("ackTimeoutMillis"));
434-
map.from(this::getTickDurationMillis).to(properties.in("tickDurationMillis"));
435-
map.from(this::getPriorityLevel).to(properties.in("priorityLevel"));
436-
map.from(this::getCryptoFailureAction).to(properties.in("cryptoFailureAction"));
437-
map.from(this::getProperties).to(properties.in("properties"));
438-
map.from(this::isReadCompacted).to(properties.in("readCompacted"));
439-
map.from(this::getSubscriptionInitialPosition).to(properties.in("subscriptionInitialPosition"));
440-
map.from(this::getPatternAutoDiscoveryPeriod).to(properties.in("patternAutoDiscoveryPeriod"));
441-
map.from(this::getRegexSubscriptionMode).to(properties.in("regexSubscriptionMode"));
442-
map.from(this::isAutoUpdatePartitions).to(properties.in("autoUpdatePartitions"));
443-
map.from(this::isReplicateSubscriptionState).to(properties.in("replicateSubscriptionState"));
432+
.to(conf::setMaxTotalReceiverQueueSizeAcrossPartitions);
433+
map.from(this::getConsumerName).to(conf::setConsumerName);
434+
map.from(this::getAckTimeoutMillis).to(conf::setAckTimeoutMillis);
435+
map.from(this::getTickDurationMillis).to(conf::setTickDurationMillis);
436+
map.from(this::getPriorityLevel).to(conf::setPriorityLevel);
437+
map.from(this::getCryptoFailureAction).to(conf::setCryptoFailureAction);
438+
map.from(this::getProperties).to(conf::setProperties);
439+
map.from(this::isReadCompacted).to(conf::setReadCompacted);
440+
map.from(this::getSubscriptionInitialPosition).to(conf::setSubscriptionInitialPosition);
441+
map.from(this::getPatternAutoDiscoveryPeriod).to(conf::setPatternAutoDiscoveryPeriod);
442+
map.from(this::getRegexSubscriptionMode).to(conf::setRegexSubscriptionMode);
443+
map.from(this::isAutoUpdatePartitions).to(conf::setAutoUpdatePartitions);
444+
map.from(this::isReplicateSubscriptionState).to(conf::setReplicateSubscriptionState);
444445
map.from(this::isAutoAckOldestChunkedMessageOnQueueFull)
445-
.to(properties.in("autoAckOldestChunkedMessageOnQueueFull"));
446-
map.from(this::getMaxPendingChunkedMessage).to(properties.in("maxPendingChunkedMessage"));
446+
.to(conf::setAutoAckOldestChunkedMessageOnQueueFull);
447+
map.from(this::getMaxPendingChunkedMessage).to(conf::setMaxPendingChunkedMessage);
447448
map.from(this::getExpireTimeOfIncompleteChunkedMessageMillis)
448-
.to(properties.in("expireTimeOfIncompleteChunkedMessageMillis"));
449-
return properties;
449+
.to(conf::setExpireTimeOfIncompleteChunkedMessageMillis);
450+
return conf;
450451
}
451452

452453
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.core;
18+
19+
import org.apache.pulsar.client.api.ConsumerBuilder;
20+
21+
/**
22+
* The interface to customize a {@link ConsumerBuilder}.
23+
*
24+
* @param <T> The message payload type
25+
* @author Christophe Bornet
26+
*/
27+
@FunctionalInterface
28+
public interface ConsumerBuilderCustomizer<T> {
29+
30+
/**
31+
* Customizes a {@link ConsumerBuilder}.
32+
* @param consumerBuilder the builder to customize
33+
*/
34+
void customize(ConsumerBuilder<T> consumerBuilder);
35+
36+
}

spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java

Lines changed: 144 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,22 @@
1717
package org.springframework.pulsar.core;
1818

1919
import java.util.ArrayList;
20-
import java.util.HashMap;
20+
import java.util.Collection;
21+
import java.util.Collections;
22+
import java.util.HashSet;
2123
import java.util.List;
2224
import java.util.Map;
25+
import java.util.TreeMap;
26+
import java.util.concurrent.TimeUnit;
2327

24-
import org.apache.pulsar.client.api.BatchReceivePolicy;
2528
import org.apache.pulsar.client.api.Consumer;
2629
import org.apache.pulsar.client.api.ConsumerBuilder;
27-
import org.apache.pulsar.client.api.DeadLetterPolicy;
2830
import org.apache.pulsar.client.api.PulsarClient;
2931
import org.apache.pulsar.client.api.PulsarClientException;
30-
import org.apache.pulsar.client.api.RedeliveryBackoff;
3132
import org.apache.pulsar.client.api.Schema;
33+
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
3234

35+
import org.springframework.lang.Nullable;
3336
import org.springframework.util.CollectionUtils;
3437

3538
/**
@@ -38,82 +41,173 @@
3841
* @param <T> underlying payload type for the consumer.
3942
* @author Soby Chacko
4043
* @author Alexander Preuß
44+
* @author Christophe Bornet
4145
*/
4246
public class DefaultPulsarConsumerFactory<T> implements PulsarConsumerFactory<T> {
4347

44-
private final Map<String, Object> consumerConfig = new HashMap<>();
48+
private final ConsumerConfigurationData<T> consumerConfig;
4549

4650
private final List<Consumer<T>> consumers = new ArrayList<>();
4751

48-
private PulsarClient pulsarClient;
52+
private final PulsarClient pulsarClient;
4953

50-
public DefaultPulsarConsumerFactory(PulsarClient pulsarClient, Map<String, Object> consumerConfig) {
54+
public DefaultPulsarConsumerFactory(PulsarClient pulsarClient, ConsumerConfigurationData<T> consumerConfig) {
5155
this.pulsarClient = pulsarClient;
52-
if (!CollectionUtils.isEmpty(consumerConfig)) {
53-
this.consumerConfig.putAll(consumerConfig);
54-
}
56+
this.consumerConfig = consumerConfig != null ? consumerConfig : new ConsumerConfigurationData<>();
5557
}
5658

5759
@Override
58-
public Consumer<T> createConsumer(Schema<T> schema, Map<String, Object> propertiesToOverride)
59-
throws PulsarClientException {
60-
61-
final ConsumerBuilder<T> consumerBuilder = this.pulsarClient.newConsumer(schema);
60+
public Consumer<T> createConsumer(Schema<T> schema) throws PulsarClientException {
61+
return createConsumer(schema, Collections.emptyList());
62+
}
6263

63-
final Map<String, Object> properties = new HashMap<>(this.consumerConfig);
64-
properties.putAll(propertiesToOverride);
64+
@Override
65+
public Consumer<T> createConsumer(Schema<T> schema, List<ConsumerBuilderCustomizer<T>> customizers)
66+
throws PulsarClientException {
6567

66-
if (!CollectionUtils.isEmpty(properties)) {
67-
consumerBuilder.loadConf(properties);
68-
}
69-
Consumer<T> consumer = consumerBuilder.subscribe();
70-
this.consumers.add(consumer);
71-
return consumer;
68+
return createConsumer(schema, null, null, Collections.emptyList());
7269
}
7370

7471
@Override
75-
public Consumer<T> createConsumer(Schema<T> schema, BatchReceivePolicy batchReceivePolicy,
76-
Map<String, Object> propertiesToOverride) throws PulsarClientException {
77-
72+
public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String> topics,
73+
@Nullable Map<String, String> properties, @Nullable List<ConsumerBuilderCustomizer<T>> customizers)
74+
throws PulsarClientException {
7875
final ConsumerBuilder<T> consumerBuilder = this.pulsarClient.newConsumer(schema);
79-
final Map<String, Object> properties = new HashMap<>(this.consumerConfig);
80-
properties.putAll(propertiesToOverride);
8176

82-
// Remove deadLetterPolicy from the properties here and save it to re-apply after
83-
// calling `loadConf` (https://github.com/apache/pulsar/issues/11646)
84-
DeadLetterPolicy deadLetterPolicy = null;
85-
if (properties.containsKey("deadLetterPolicy")) {
86-
deadLetterPolicy = (DeadLetterPolicy) properties.remove("deadLetterPolicy");
77+
ConsumerConfigurationData<T> config = this.consumerConfig.clone();
78+
if (topics != null) {
79+
config.setTopicNames(new HashSet<>(topics));
8780
}
88-
89-
if (!CollectionUtils.isEmpty(properties)) {
90-
consumerBuilder.loadConf(properties);
91-
}
92-
93-
if (deadLetterPolicy != null) {
94-
consumerBuilder.deadLetterPolicy(deadLetterPolicy);
81+
if (properties != null) {
82+
config.setProperties(new TreeMap<>(properties));
9583
}
9684

97-
if (properties.containsKey("negativeAckRedeliveryBackoff")) {
98-
final RedeliveryBackoff negativeAckRedeliveryBackoff = (RedeliveryBackoff) properties
99-
.get("negativeAckRedeliveryBackoff");
100-
consumerBuilder.negativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff);
101-
}
85+
loadConf(consumerBuilder, config);
10286

103-
if (properties.containsKey("ackTimeoutRedeliveryBackoff")) {
104-
final RedeliveryBackoff ackTimeoutRedeliveryBackoff = (RedeliveryBackoff) properties
105-
.get("ackTimeoutRedeliveryBackoff");
106-
consumerBuilder.ackTimeoutRedeliveryBackoff(ackTimeoutRedeliveryBackoff);
87+
if (!CollectionUtils.isEmpty(customizers)) {
88+
customizers.forEach(customizer -> customizer.customize(consumerBuilder));
10789
}
10890

109-
consumerBuilder.batchReceivePolicy(batchReceivePolicy);
11091
Consumer<T> consumer = consumerBuilder.subscribe();
11192
this.consumers.add(consumer);
11293
return consumer;
11394
}
11495

115-
public Map<String, Object> getConsumerConfig() {
96+
public ConsumerConfigurationData<T> getConsumerConfig() {
11697
return this.consumerConfig;
11798
}
11899

100+
private static <T> void loadConf(ConsumerBuilder<T> consumer, ConsumerConfigurationData<T> conf) {
101+
if (!CollectionUtils.isEmpty(conf.getTopicNames())) {
102+
consumer.topics(conf.getTopicNames().stream().toList());
103+
}
104+
if (conf.getTopicsPattern() != null) {
105+
consumer.topicsPattern(conf.getTopicsPattern());
106+
}
107+
if (conf.getSubscriptionName() != null) {
108+
consumer.subscriptionName(conf.getSubscriptionName());
109+
}
110+
if (conf.getSubscriptionType() != null) {
111+
consumer.subscriptionType(conf.getSubscriptionType());
112+
}
113+
if (!CollectionUtils.isEmpty(conf.getSubscriptionProperties())) {
114+
consumer.subscriptionProperties(conf.getSubscriptionProperties());
115+
}
116+
if (conf.getSubscriptionMode() != null) {
117+
consumer.subscriptionMode(conf.getSubscriptionMode());
118+
}
119+
if (conf.getMessageListener() != null) {
120+
consumer.messageListener(conf.getMessageListener());
121+
}
122+
if (conf.getConsumerEventListener() != null) {
123+
consumer.consumerEventListener(conf.getConsumerEventListener());
124+
}
125+
if (conf.getNegativeAckRedeliveryBackoff() != null) {
126+
consumer.negativeAckRedeliveryBackoff(conf.getNegativeAckRedeliveryBackoff());
127+
}
128+
if (conf.getAckTimeoutRedeliveryBackoff() != null) {
129+
consumer.ackTimeoutRedeliveryBackoff(conf.getAckTimeoutRedeliveryBackoff());
130+
}
131+
if (conf.getReceiverQueueSize() >= 0) {
132+
consumer.receiverQueueSize(conf.getReceiverQueueSize());
133+
}
134+
if (conf.getAcknowledgementsGroupTimeMicros() >= 0) {
135+
consumer.acknowledgmentGroupTime(conf.getAcknowledgementsGroupTimeMicros(), TimeUnit.MICROSECONDS);
136+
}
137+
if (conf.getNegativeAckRedeliveryDelayMicros() >= 0) {
138+
consumer.negativeAckRedeliveryDelay(conf.getNegativeAckRedeliveryDelayMicros(), TimeUnit.MICROSECONDS);
139+
}
140+
if (conf.getMaxTotalReceiverQueueSizeAcrossPartitions() >= 0) {
141+
consumer.maxTotalReceiverQueueSizeAcrossPartitions(conf.getMaxTotalReceiverQueueSizeAcrossPartitions());
142+
}
143+
if (conf.getConsumerName() != null) {
144+
consumer.consumerName(conf.getConsumerName());
145+
}
146+
if (conf.getAckTimeoutMillis() >= 0) {
147+
consumer.ackTimeout(conf.getAckTimeoutMillis(), TimeUnit.MILLISECONDS);
148+
}
149+
if (conf.getTickDurationMillis() >= 0) {
150+
consumer.ackTimeoutTickTime(conf.getTickDurationMillis(), TimeUnit.MILLISECONDS);
151+
}
152+
if (conf.getPriorityLevel() >= 0) {
153+
consumer.priorityLevel(conf.getPriorityLevel());
154+
}
155+
if (conf.getMaxPendingChunkedMessage() >= 0) {
156+
consumer.maxPendingChunkedMessage(conf.getMaxPendingChunkedMessage());
157+
}
158+
consumer.autoAckOldestChunkedMessageOnQueueFull(conf.isAutoAckOldestChunkedMessageOnQueueFull());
159+
if (conf.getExpireTimeOfIncompleteChunkedMessageMillis() >= 0) {
160+
consumer.expireTimeOfIncompleteChunkedMessage(conf.getExpireTimeOfIncompleteChunkedMessageMillis(),
161+
TimeUnit.MILLISECONDS);
162+
}
163+
if (conf.getCryptoKeyReader() != null) {
164+
consumer.cryptoKeyReader(conf.getCryptoKeyReader());
165+
}
166+
if (conf.getMessageCrypto() != null) {
167+
consumer.messageCrypto(conf.getMessageCrypto());
168+
}
169+
if (conf.getCryptoFailureAction() != null) {
170+
consumer.cryptoFailureAction(conf.getCryptoFailureAction());
171+
}
172+
if (!CollectionUtils.isEmpty(conf.getProperties())) {
173+
consumer.properties(conf.getProperties());
174+
}
175+
consumer.readCompacted(conf.isReadCompacted());
176+
if (conf.getSubscriptionInitialPosition() != null) {
177+
consumer.subscriptionInitialPosition(conf.getSubscriptionInitialPosition());
178+
}
179+
if (conf.getPatternAutoDiscoveryPeriod() >= 0) {
180+
consumer.patternAutoDiscoveryPeriod(conf.getPatternAutoDiscoveryPeriod(), TimeUnit.SECONDS);
181+
}
182+
if (conf.getRegexSubscriptionMode() != null) {
183+
consumer.subscriptionTopicsMode(conf.getRegexSubscriptionMode());
184+
}
185+
if (conf.getDeadLetterPolicy() != null) {
186+
consumer.deadLetterPolicy(conf.getDeadLetterPolicy());
187+
}
188+
consumer.enableRetry(conf.isRetryEnable());
189+
if (conf.getBatchReceivePolicy() != null) {
190+
consumer.batchReceivePolicy(conf.getBatchReceivePolicy());
191+
}
192+
consumer.autoUpdatePartitions(conf.isAutoUpdatePartitions());
193+
if (conf.getAutoUpdatePartitionsIntervalSeconds() >= 0) {
194+
consumer.autoUpdatePartitionsInterval((int) conf.getAutoUpdatePartitionsIntervalSeconds(),
195+
TimeUnit.SECONDS);
196+
}
197+
consumer.replicateSubscriptionState(conf.isReplicateSubscriptionState());
198+
if (conf.isResetIncludeHead()) {
199+
consumer.startMessageIdInclusive();
200+
}
201+
if (conf.getKeySharedPolicy() != null) {
202+
consumer.keySharedPolicy(conf.getKeySharedPolicy());
203+
}
204+
consumer.enableBatchIndexAcknowledgment(conf.isBatchIndexAckEnabled());
205+
consumer.isAckReceiptEnabled(conf.isAckReceiptEnabled());
206+
consumer.poolMessages(conf.isPoolMessages());
207+
if (conf.getPayloadProcessor() != null) {
208+
consumer.messagePayloadProcessor(conf.getPayloadProcessor());
209+
}
210+
consumer.startPaused(conf.isStartPaused());
211+
}
212+
119213
}

0 commit comments

Comments
 (0)