Skip to content

Commit dbd1b69

Browse files
authored
Use builder to autoconfigure PulsarClient (#394)
- Apply PulsarProperties to client builder rather than raw map of Pulsar properties - Replace PulsarClientFactoryBean with PulsarClientFactory
1 parent 303573b commit dbd1b69

File tree

17 files changed

+750
-251
lines changed

17 files changed

+750
-251
lines changed

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.Collections;
2424
import java.util.HashMap;
2525
import java.util.List;
26-
import java.util.Map;
2726
import java.util.Objects;
2827
import java.util.concurrent.BlockingQueue;
2928
import java.util.concurrent.CountDownLatch;
@@ -36,6 +35,7 @@
3635
import org.apache.pulsar.client.api.Message;
3736
import org.apache.pulsar.client.api.MessageId;
3837
import org.apache.pulsar.client.api.PulsarClient;
38+
import org.apache.pulsar.client.api.PulsarClientException;
3939
import org.apache.pulsar.client.api.Schema;
4040
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
4141
import org.apache.pulsar.client.api.SubscriptionType;
@@ -57,7 +57,7 @@
5757
import org.springframework.context.annotation.Configuration;
5858
import org.springframework.messaging.handler.annotation.Header;
5959
import org.springframework.pulsar.annotation.EnablePulsar;
60-
import org.springframework.pulsar.config.PulsarClientFactoryBean;
60+
import org.springframework.pulsar.core.DefaultPulsarClientFactory;
6161
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
6262
import org.springframework.pulsar.core.DefaultSchemaResolver;
6363
import org.springframework.pulsar.core.DefaultTopicResolver;
@@ -111,8 +111,8 @@ public PulsarProducerFactory<String> pulsarProducerFactory(PulsarClient pulsarCl
111111
}
112112

113113
@Bean
114-
public PulsarClientFactoryBean pulsarClientFactoryBean() {
115-
return new PulsarClientFactoryBean(Map.of("serviceUrl", PulsarTestContainerSupport.getPulsarBrokerUrl()));
114+
public PulsarClient pulsarClient() throws PulsarClientException {
115+
return new DefaultPulsarClientFactory(PulsarTestContainerSupport.getPulsarBrokerUrl()).createClient();
116116
}
117117

118118
@Bean

spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Optional;
2020

2121
import org.apache.pulsar.client.api.PulsarClient;
22+
import org.apache.pulsar.client.api.PulsarClientException;
2223
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
2324

2425
import org.springframework.beans.factory.ObjectProvider;
@@ -30,14 +31,15 @@
3031
import org.springframework.boot.context.properties.EnableConfigurationProperties;
3132
import org.springframework.context.annotation.Bean;
3233
import org.springframework.context.annotation.Import;
33-
import org.springframework.pulsar.config.PulsarClientFactoryBean;
3434
import org.springframework.pulsar.core.CachingPulsarProducerFactory;
35+
import org.springframework.pulsar.core.DefaultPulsarClientFactory;
3536
import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
3637
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
3738
import org.springframework.pulsar.core.DefaultPulsarReaderFactory;
3839
import org.springframework.pulsar.core.DefaultSchemaResolver;
3940
import org.springframework.pulsar.core.DefaultTopicResolver;
4041
import org.springframework.pulsar.core.PulsarAdministration;
42+
import org.springframework.pulsar.core.PulsarClientBuilderCustomizer;
4143
import org.springframework.pulsar.core.PulsarConsumerFactory;
4244
import org.springframework.pulsar.core.PulsarProducerFactory;
4345
import org.springframework.pulsar.core.PulsarReaderFactory;
@@ -71,8 +73,21 @@ public PulsarAutoConfiguration(PulsarProperties properties) {
7173

7274
@Bean
7375
@ConditionalOnMissingBean
74-
public PulsarClientFactoryBean pulsarClientFactoryBean() {
75-
return new PulsarClientFactoryBean(this.properties.buildClientProperties());
76+
public PulsarClientBuilderConfigurer pulsarClientBuilderConfigurer(PulsarProperties pulsarProperties,
77+
ObjectProvider<PulsarClientBuilderCustomizer> customizers) {
78+
return new PulsarClientBuilderConfigurer(pulsarProperties, customizers.orderedStream().toList());
79+
}
80+
81+
@Bean
82+
@ConditionalOnMissingBean
83+
public PulsarClient pulsarClient(PulsarClientBuilderConfigurer configurer) {
84+
var clientFactory = new DefaultPulsarClientFactory(configurer::configure);
85+
try {
86+
return clientFactory.createClient();
87+
}
88+
catch (PulsarClientException ex) {
89+
throw new IllegalArgumentException("Failed to create client: " + ex.getMessage(), ex);
90+
}
7691
}
7792

7893
@Bean
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Copyright 2023-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.autoconfigure;
18+
19+
import java.net.InetSocketAddress;
20+
import java.net.URI;
21+
import java.time.Duration;
22+
import java.util.List;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import org.apache.pulsar.client.api.ClientBuilder;
26+
import org.apache.pulsar.client.api.PulsarClientException;
27+
import org.apache.pulsar.client.api.SizeUnit;
28+
29+
import org.springframework.boot.context.properties.PropertyMapper;
30+
import org.springframework.boot.util.LambdaSafe;
31+
import org.springframework.pulsar.autoconfigure.PulsarProperties.Client;
32+
import org.springframework.pulsar.core.PulsarClientBuilderCustomizer;
33+
import org.springframework.util.Assert;
34+
import org.springframework.util.CollectionUtils;
35+
import org.springframework.util.StringUtils;
36+
import org.springframework.util.unit.DataSize;
37+
38+
/**
39+
* Configure Pulsar {@link ClientBuilder} with sensible defaults and apply a list of
40+
* optional {@link PulsarClientBuilderCustomizer customizers}.
41+
*
42+
* @author Chris Bono
43+
*/
44+
public class PulsarClientBuilderConfigurer {
45+
46+
private final PulsarProperties properties;
47+
48+
private final List<PulsarClientBuilderCustomizer> customizers;
49+
50+
/**
51+
* Creates a new configurer that will use the given properties for configuration.
52+
* @param properties properties to use
53+
* @param customizers list of customizers to apply or empty list if no customizers
54+
*/
55+
public PulsarClientBuilderConfigurer(PulsarProperties properties, List<PulsarClientBuilderCustomizer> customizers) {
56+
Assert.notNull(properties, "properties must not be null");
57+
Assert.notNull(customizers, "customizers must not be null");
58+
this.properties = properties;
59+
this.customizers = customizers;
60+
}
61+
62+
/**
63+
* Configure the specified {@link ClientBuilder}. The builder can be further tuned and
64+
* default settings can be overridden.
65+
* @param clientBuilder the {@link ClientBuilder} instance to configure
66+
*/
67+
public void configure(ClientBuilder clientBuilder) {
68+
applyProperties(this.properties, clientBuilder);
69+
applyCustomizers(this.customizers, clientBuilder);
70+
}
71+
72+
@SuppressWarnings("deprecation")
73+
protected void applyProperties(PulsarProperties pulsarProperties, ClientBuilder clientBuilder) {
74+
var map = PropertyMapper.get().alwaysApplyingWhenNonNull();
75+
var clientProperties = pulsarProperties.getClient();
76+
map.from(clientProperties::getServiceUrl).to(clientBuilder::serviceUrl);
77+
map.from(clientProperties::getListenerName).to(clientBuilder::listenerName);
78+
map.from(clientProperties::getNumIoThreads).to(clientBuilder::ioThreads);
79+
map.from(clientProperties::getNumListenerThreads).to(clientBuilder::listenerThreads);
80+
map.from(clientProperties::getNumConnectionsPerBroker).to(clientBuilder::connectionsPerBroker);
81+
map.from(clientProperties::getMaxConcurrentLookupRequest).to(clientBuilder::maxConcurrentLookupRequests);
82+
map.from(clientProperties::getMaxLookupRequest).to(clientBuilder::maxLookupRequests);
83+
map.from(clientProperties::getMaxLookupRedirects).to(clientBuilder::maxLookupRedirects);
84+
map.from(clientProperties::getMaxNumberOfRejectedRequestPerConnection)
85+
.to(clientBuilder::maxNumberOfRejectedRequestPerConnection);
86+
map.from(clientProperties::getUseTcpNoDelay).to(clientBuilder::enableTcpNoDelay);
87+
88+
// Authentication properties
89+
applyAuthentication(clientProperties, clientBuilder);
90+
91+
// TLS properties
92+
map.from(clientProperties::getUseTls).to(clientBuilder::enableTls);
93+
map.from(clientProperties::getTlsHostnameVerificationEnable).to(clientBuilder::enableTlsHostnameVerification);
94+
map.from(clientProperties::getTlsTrustCertsFilePath).to(clientBuilder::tlsTrustCertsFilePath);
95+
map.from(clientProperties::getTlsCertificateFilePath).to(clientBuilder::tlsCertificateFilePath);
96+
map.from(clientProperties::getTlsKeyFilePath).to(clientBuilder::tlsKeyFilePath);
97+
map.from(clientProperties::getTlsAllowInsecureConnection).to(clientBuilder::allowTlsInsecureConnection);
98+
map.from(clientProperties::getUseKeyStoreTls).to(clientBuilder::useKeyStoreTls);
99+
map.from(clientProperties::getSslProvider).to(clientBuilder::sslProvider);
100+
map.from(clientProperties::getTlsTrustStoreType).to(clientBuilder::tlsTrustStoreType);
101+
map.from(clientProperties::getTlsTrustStorePath).to(clientBuilder::tlsTrustStorePath);
102+
map.from(clientProperties::getTlsTrustStorePassword).to(clientBuilder::tlsTrustStorePassword);
103+
map.from(clientProperties::getTlsCiphers).to(clientBuilder::tlsCiphers);
104+
map.from(clientProperties::getTlsProtocols).to(clientBuilder::tlsProtocols);
105+
106+
map.from(clientProperties::getStatsInterval).as(Duration::toSeconds).to(clientBuilder,
107+
(cb, val) -> cb.statsInterval(val, TimeUnit.SECONDS));
108+
map.from(clientProperties::getKeepAliveInterval).asInt(Duration::toMillis).to(clientBuilder,
109+
(cb, val) -> cb.keepAliveInterval(val, TimeUnit.MILLISECONDS));
110+
map.from(clientProperties::getConnectionTimeout).asInt(Duration::toMillis).to(clientBuilder,
111+
(cb, val) -> cb.connectionTimeout(val, TimeUnit.MILLISECONDS));
112+
map.from(clientProperties::getOperationTimeout).asInt(Duration::toMillis).to(clientBuilder,
113+
(cb, val) -> cb.operationTimeout(val, TimeUnit.MILLISECONDS));
114+
map.from(clientProperties::getLookupTimeout).asInt(Duration::toMillis).to(clientBuilder,
115+
(cb, val) -> cb.lookupTimeout(val, TimeUnit.MILLISECONDS));
116+
map.from(clientProperties::getInitialBackoffInterval).as(Duration::toMillis).to(clientBuilder,
117+
(cb, val) -> cb.startingBackoffInterval(val, TimeUnit.MILLISECONDS));
118+
map.from(clientProperties::getMaxBackoffInterval).as(Duration::toMillis).to(clientBuilder,
119+
(cb, val) -> cb.maxBackoffInterval(val, TimeUnit.MILLISECONDS));
120+
map.from(clientProperties::getEnableBusyWait).to(clientBuilder::enableBusyWait);
121+
map.from(clientProperties::getMemoryLimit).as(DataSize::toBytes).to(clientBuilder,
122+
(cb, val) -> cb.memoryLimit(val, SizeUnit.BYTES));
123+
map.from(clientProperties::getEnableTransaction).to(clientBuilder::enableTransaction);
124+
map.from(clientProperties::getProxyServiceUrl)
125+
.to((proxyUrl) -> clientBuilder.proxyServiceUrl(proxyUrl, clientProperties.getProxyProtocol()));
126+
map.from(clientProperties::getDnsLookupBindAddress)
127+
.to((bindAddr) -> clientBuilder.dnsLookupBind(bindAddr, clientProperties.getDnsLookupBindPort()));
128+
map.from(clientProperties::getSocks5ProxyAddress).as(this::parseSocketAddress)
129+
.to(clientBuilder::socks5ProxyAddress);
130+
map.from(clientProperties::getSocks5ProxyUsername).to(clientBuilder::socks5ProxyUsername);
131+
map.from(clientProperties::getSocks5ProxyPassword).to(clientBuilder::socks5ProxyPassword);
132+
}
133+
134+
private void applyAuthentication(Client clientProperties, ClientBuilder clientBuilder) {
135+
if (StringUtils.hasText(clientProperties.getAuthParams())
136+
&& !CollectionUtils.isEmpty(clientProperties.getAuthentication())) {
137+
throw new IllegalArgumentException(
138+
"Cannot set both spring.pulsar.client.authParams and spring.pulsar.client.authentication.*");
139+
}
140+
var authPluginClass = clientProperties.getAuthPluginClassName();
141+
if (StringUtils.hasText(authPluginClass)) {
142+
var authParams = clientProperties.getAuthParams();
143+
if (clientProperties.getAuthentication() != null) {
144+
authParams = AuthParameterUtils.maybeConvertToEncodedParamString(clientProperties.getAuthentication());
145+
}
146+
try {
147+
clientBuilder.authentication(authPluginClass, authParams);
148+
}
149+
catch (PulsarClientException.UnsupportedAuthenticationException ex) {
150+
throw new IllegalArgumentException("Unable to configure authentication: " + ex.getMessage(), ex);
151+
}
152+
}
153+
}
154+
155+
private InetSocketAddress parseSocketAddress(String address) {
156+
try {
157+
var uri = URI.create(address);
158+
return new InetSocketAddress(uri.getHost(), uri.getPort());
159+
}
160+
catch (Exception e) {
161+
throw new IllegalArgumentException("Invalid address: " + e.getMessage(), e);
162+
}
163+
}
164+
165+
protected void applyCustomizers(List<PulsarClientBuilderCustomizer> clientBuilderCustomizers,
166+
ClientBuilder clientBuilder) {
167+
LambdaSafe.callbacks(PulsarClientBuilderCustomizer.class, clientBuilderCustomizers, clientBuilder)
168+
.withLogger(PulsarClientBuilderConfigurer.class)
169+
.invoke((customizer) -> customizer.customize(clientBuilder));
170+
}
171+
172+
}

spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,6 @@ public Map<String, Object> buildConsumerProperties() {
110110
return new HashMap<>(this.consumer.buildProperties());
111111
}
112112

113-
public Map<String, Object> buildClientProperties() {
114-
return new HashMap<>(this.client.buildProperties());
115-
}
116-
117113
public Map<String, Object> buildProducerProperties() {
118114
return new HashMap<>(this.producer.buildProperties());
119115
}
@@ -773,67 +769,6 @@ public void setSocks5ProxyPassword(String socks5ProxyPassword) {
773769
this.socks5ProxyPassword = socks5ProxyPassword;
774770
}
775771

776-
public Map<String, Object> buildProperties() {
777-
if (StringUtils.hasText(this.getAuthParams()) && !CollectionUtils.isEmpty(this.getAuthentication())) {
778-
throw new IllegalArgumentException(
779-
"Cannot set both spring.pulsar.client.authParams and spring.pulsar.client.authentication.*");
780-
}
781-
782-
PulsarProperties.Properties properties = new Properties();
783-
784-
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
785-
map.from(this::getServiceUrl).to(properties.in("serviceUrl"));
786-
map.from(this::getListenerName).to(properties.in("listenerName"));
787-
map.from(this::getAuthPluginClassName).to(properties.in("authPluginClassName"));
788-
map.from(this::getAuthParams).to(properties.in("authParams"));
789-
map.from(this::getAuthentication).as(AuthParameterUtils::maybeConvertToEncodedParamString)
790-
.to(properties.in("authParams"));
791-
map.from(this::getOperationTimeout).as(Duration::toMillis).to(properties.in("operationTimeoutMs"));
792-
map.from(this::getLookupTimeout).as(Duration::toMillis).to(properties.in("lookupTimeoutMs"));
793-
map.from(this::getNumIoThreads).to(properties.in("numIoThreads"));
794-
map.from(this::getNumListenerThreads).to(properties.in("numListenerThreads"));
795-
map.from(this::getNumConnectionsPerBroker).to(properties.in("connectionsPerBroker"));
796-
map.from(this::getUseTcpNoDelay).to(properties.in("useTcpNoDelay"));
797-
map.from(this::getUseTls).to(properties.in("useTls"));
798-
map.from(this::getTlsHostnameVerificationEnable).to(properties.in("tlsHostnameVerificationEnable"));
799-
map.from(this::getTlsTrustCertsFilePath).to(properties.in("tlsTrustCertsFilePath"));
800-
map.from(this::getTlsCertificateFilePath).to(properties.in("tlsCertificateFilePath"));
801-
map.from(this::getTlsKeyFilePath).to(properties.in("tlsKeyFilePath"));
802-
map.from(this::getTlsAllowInsecureConnection).to(properties.in("tlsAllowInsecureConnection"));
803-
map.from(this::getUseKeyStoreTls).to(properties.in("useKeyStoreTls"));
804-
map.from(this::getSslProvider).to(properties.in("sslProvider"));
805-
map.from(this::getTlsTrustStoreType).to(properties.in("tlsTrustStoreType"));
806-
map.from(this::getTlsTrustStorePath).to(properties.in("tlsTrustStorePath"));
807-
map.from(this::getTlsTrustStorePassword).to(properties.in("tlsTrustStorePassword"));
808-
map.from(this::getTlsCiphers).to(properties.in("tlsCiphers"));
809-
map.from(this::getTlsProtocols).to(properties.in("tlsProtocols"));
810-
map.from(this::getStatsInterval).as(Duration::toSeconds).to(properties.in("statsIntervalSeconds"));
811-
map.from(this::getMaxConcurrentLookupRequest).to(properties.in("concurrentLookupRequest"));
812-
map.from(this::getMaxLookupRequest).to(properties.in("maxLookupRequest"));
813-
map.from(this::getMaxLookupRedirects).to(properties.in("maxLookupRedirects"));
814-
map.from(this::getMaxNumberOfRejectedRequestPerConnection)
815-
.to(properties.in("maxNumberOfRejectedRequestPerConnection"));
816-
map.from(this::getKeepAliveInterval).asInt(Duration::toSeconds)
817-
.to(properties.in("keepAliveIntervalSeconds"));
818-
map.from(this::getConnectionTimeout).asInt(Duration::toMillis).to(properties.in("connectionTimeoutMs"));
819-
map.from(this::getRequestTimeout).asInt(Duration::toMillis).to(properties.in("requestTimeoutMs"));
820-
map.from(this::getInitialBackoffInterval).as(Duration::toNanos)
821-
.to(properties.in("initialBackoffIntervalNanos"));
822-
map.from(this::getMaxBackoffInterval).as(Duration::toNanos).to(properties.in("maxBackoffIntervalNanos"));
823-
map.from(this::getEnableBusyWait).to(properties.in("enableBusyWait"));
824-
map.from(this::getMemoryLimit).as(DataSize::toBytes).to(properties.in("memoryLimitBytes"));
825-
map.from(this::getProxyServiceUrl).to(properties.in("proxyServiceUrl"));
826-
map.from(this::getProxyProtocol).to(properties.in("proxyProtocol"));
827-
map.from(this::getEnableTransaction).to(properties.in("enableTransaction"));
828-
map.from(this::getDnsLookupBindAddress).to(properties.in("dnsLookupBindAddress"));
829-
map.from(this::getDnsLookupBindPort).to(properties.in("dnsLookupBindPort"));
830-
map.from(this::getSocks5ProxyAddress).to(properties.in("socks5ProxyAddress"));
831-
map.from(this::getSocks5ProxyUsername).to(properties.in("socks5ProxyUsername"));
832-
map.from(this::getSocks5ProxyPassword).to(properties.in("socks5ProxyPassword"));
833-
834-
return properties;
835-
}
836-
837772
}
838773

839774
public static class Function {

0 commit comments

Comments
 (0)