Skip to content

Commit d9af7ce

Browse files
committed
Merge pull request #37197 from onobc
* pr/37197: Polish 'Add Pulsar ConnectionDetails support' Add Pulsar ConnectionDetails support Closes gh-37197
2 parents bced103 + 750c597 commit d9af7ce

File tree

22 files changed

+537
-26
lines changed

22 files changed

+537
-26
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2012-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.pulsar;
18+
19+
/**
20+
* Adapts {@link PulsarProperties} to {@link PulsarConnectionDetails}.
21+
*
22+
* @author Chris Bono
23+
*/
24+
class PropertiesPulsarConnectionDetails implements PulsarConnectionDetails {
25+
26+
private final PulsarProperties pulsarProperties;
27+
28+
PropertiesPulsarConnectionDetails(PulsarProperties pulsarProperties) {
29+
this.pulsarProperties = pulsarProperties;
30+
}
31+
32+
@Override
33+
public String getBrokerUrl() {
34+
return this.pulsarProperties.getClient().getServiceUrl();
35+
}
36+
37+
@Override
38+
public String getAdminUrl() {
39+
return this.pulsarProperties.getAdmin().getServiceUrl();
40+
}
41+
42+
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,18 @@ class PulsarConfiguration {
7171
this.propertiesMapper = new PulsarPropertiesMapper(properties);
7272
}
7373

74+
@Bean
75+
@ConditionalOnMissingBean(PulsarConnectionDetails.class)
76+
PropertiesPulsarConnectionDetails pulsarConnectionDetails() {
77+
return new PropertiesPulsarConnectionDetails(this.properties);
78+
}
79+
7480
@Bean
7581
@ConditionalOnMissingBean(PulsarClientFactory.class)
76-
DefaultPulsarClientFactory pulsarClientFactory(ObjectProvider<PulsarClientBuilderCustomizer> customizersProvider) {
82+
DefaultPulsarClientFactory pulsarClientFactory(PulsarConnectionDetails connectionDetails,
83+
ObjectProvider<PulsarClientBuilderCustomizer> customizersProvider) {
7784
List<PulsarClientBuilderCustomizer> allCustomizers = new ArrayList<>();
78-
allCustomizers.add(this.propertiesMapper::customizeClientBuilder);
85+
allCustomizers.add((builder) -> this.propertiesMapper.customizeClientBuilder(builder, connectionDetails));
7986
allCustomizers.addAll(customizersProvider.orderedStream().toList());
8087
DefaultPulsarClientFactory clientFactory = new DefaultPulsarClientFactory(
8188
(clientBuilder) -> applyClientBuilderCustomizers(allCustomizers, clientBuilder));
@@ -95,10 +102,10 @@ PulsarClient pulsarClient(PulsarClientFactory clientFactory) throws PulsarClient
95102

96103
@Bean
97104
@ConditionalOnMissingBean
98-
PulsarAdministration pulsarAdministration(
105+
PulsarAdministration pulsarAdministration(PulsarConnectionDetails connectionDetails,
99106
ObjectProvider<PulsarAdminBuilderCustomizer> pulsarAdminBuilderCustomizers) {
100107
List<PulsarAdminBuilderCustomizer> allCustomizers = new ArrayList<>();
101-
allCustomizers.add(this.propertiesMapper::customizeAdminBuilder);
108+
allCustomizers.add((builder) -> this.propertiesMapper.customizeAdminBuilder(builder, connectionDetails));
102109
allCustomizers.addAll(pulsarAdminBuilderCustomizers.orderedStream().toList());
103110
return new PulsarAdministration((adminBuilder) -> applyAdminBuilderCustomizers(allCustomizers, adminBuilder));
104111
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2012-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.pulsar;
18+
19+
import org.springframework.boot.autoconfigure.service.connection.ConnectionDetails;
20+
21+
/**
22+
* Details required to establish a connection to a Pulsar service.
23+
*
24+
* @author Chris Bono
25+
* @since 3.2.0
26+
*/
27+
public interface PulsarConnectionDetails extends ConnectionDetails {
28+
29+
/**
30+
* URL used to connect to the broker.
31+
* @return the service URL
32+
*/
33+
String getBrokerUrl();
34+
35+
/**
36+
* URL user to connect to the admin endpoint.
37+
* @return the admin URL
38+
*/
39+
String getAdminUrl();
40+
41+
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,20 +49,20 @@ final class PulsarPropertiesMapper {
4949
this.properties = properties;
5050
}
5151

52-
void customizeClientBuilder(ClientBuilder clientBuilder) {
52+
void customizeClientBuilder(ClientBuilder clientBuilder, PulsarConnectionDetails connectionDetails) {
5353
PulsarProperties.Client properties = this.properties.getClient();
5454
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
55-
map.from(properties::getServiceUrl).to(clientBuilder::serviceUrl);
55+
map.from(connectionDetails::getBrokerUrl).to(clientBuilder::serviceUrl);
5656
map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout));
5757
map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout));
5858
map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout));
5959
customizeAuthentication(clientBuilder::authentication, properties.getAuthentication());
6060
}
6161

62-
void customizeAdminBuilder(PulsarAdminBuilder adminBuilder) {
62+
void customizeAdminBuilder(PulsarAdminBuilder adminBuilder, PulsarConnectionDetails connectionDetails) {
6363
PulsarProperties.Admin properties = this.properties.getAdmin();
6464
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
65-
map.from(properties::getServiceUrl).to(adminBuilder::serviceHttpUrl);
65+
map.from(connectionDetails::getAdminUrl).to(adminBuilder::serviceHttpUrl);
6666
map.from(properties::getConnectionTimeout).to(timeoutProperty(adminBuilder::connectionTimeout));
6767
map.from(properties::getReadTimeout).to(timeoutProperty(adminBuilder::readTimeout));
6868
map.from(properties::getRequestTimeout).to(timeoutProperty(adminBuilder::requestTimeout));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2012-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.pulsar;
18+
19+
import org.junit.jupiter.api.Test;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
/**
24+
* Tests for {@link PropertiesPulsarConnectionDetails}.
25+
*
26+
* @author Chris Bono
27+
*/
28+
class PropertiesPulsarConnectionDetailsTests {
29+
30+
@Test
31+
void getClientServiceUrlReturnsValueFromProperties() {
32+
PulsarProperties properties = new PulsarProperties();
33+
properties.getClient().setServiceUrl("foo");
34+
PulsarConnectionDetails connectionDetails = new PropertiesPulsarConnectionDetails(properties);
35+
assertThat(connectionDetails.getBrokerUrl()).isEqualTo("foo");
36+
}
37+
38+
@Test
39+
void getAdminServiceHttpUrlReturnsValueFromProperties() {
40+
PulsarProperties properties = new PulsarProperties();
41+
properties.getAdmin().setServiceUrl("foo");
42+
PulsarConnectionDetails connectionDetails = new PropertiesPulsarConnectionDetails(properties);
43+
assertThat(connectionDetails.getAdminUrl()).isEqualTo("foo");
44+
}
45+
46+
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ void whenCustomPulsarReaderAnnotationProcessorDefinedAutoConfigurationIsSkipped(
114114
@Test
115115
void autoConfiguresBeans() {
116116
this.contextRunner.run((context) -> assertThat(context).hasSingleBean(PulsarConfiguration.class)
117+
.hasSingleBean(PulsarConnectionDetails.class)
117118
.hasSingleBean(DefaultPulsarClientFactory.class)
118119
.hasSingleBean(PulsarClient.class)
119120
.hasSingleBean(PulsarAdministration.class)

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151

5252
import static org.assertj.core.api.Assertions.assertThat;
5353
import static org.assertj.core.api.Assertions.entry;
54+
import static org.mockito.BDDMockito.given;
5455
import static org.mockito.Mockito.mock;
5556

5657
/**
@@ -67,6 +68,15 @@ class PulsarConfigurationTests {
6768
.withConfiguration(AutoConfigurations.of(PulsarConfiguration.class))
6869
.withBean(PulsarClient.class, () -> mock(PulsarClient.class));
6970

71+
@Test
72+
void whenHasUserDefinedConnectionDetailsBeanDoesNotAutoConfigureBean() {
73+
PulsarConnectionDetails customConnectionDetails = mock(PulsarConnectionDetails.class);
74+
this.contextRunner
75+
.withBean("customPulsarConnectionDetails", PulsarConnectionDetails.class, () -> customConnectionDetails)
76+
.run((context) -> assertThat(context).getBean(PulsarConnectionDetails.class)
77+
.isSameAs(customConnectionDetails));
78+
}
79+
7080
@Nested
7181
class ClientTests {
7282

@@ -88,15 +98,18 @@ void whenHasUserDefinedClientBeanDoesNotAutoConfigureBean() {
8898

8999
@Test
90100
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
101+
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
102+
given(connectionDetails.getBrokerUrl()).willReturn("connectiondetails");
91103
PulsarConfigurationTests.this.contextRunner
92104
.withUserConfiguration(PulsarClientBuilderCustomizersConfig.class)
93-
.withPropertyValues("spring.pulsar.client.service-url=fromPropsCustomizer")
105+
.withBean(PulsarConnectionDetails.class, () -> connectionDetails)
106+
.withPropertyValues("spring.pulsar.client.service-url=properties")
94107
.run((context) -> {
95108
DefaultPulsarClientFactory clientFactory = context.getBean(DefaultPulsarClientFactory.class);
96109
Customizers<PulsarClientBuilderCustomizer, ClientBuilder> customizers = Customizers
97110
.of(ClientBuilder.class, PulsarClientBuilderCustomizer::customize);
98111
assertThat(customizers.fromField(clientFactory, "customizer")).callsInOrder(
99-
ClientBuilder::serviceUrl, "fromPropsCustomizer", "fromCustomizer1", "fromCustomizer2");
112+
ClientBuilder::serviceUrl, "connectiondetails", "fromCustomizer1", "fromCustomizer2");
100113
});
101114
}
102115

@@ -135,14 +148,17 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
135148

136149
@Test
137150
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
151+
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
152+
given(connectionDetails.getAdminUrl()).willReturn("connectiondetails");
138153
this.contextRunner.withUserConfiguration(PulsarAdminBuilderCustomizersConfig.class)
139-
.withPropertyValues("spring.pulsar.admin.service-url=fromPropsCustomizer")
154+
.withBean(PulsarConnectionDetails.class, () -> connectionDetails)
155+
.withPropertyValues("spring.pulsar.admin.service-url=property")
140156
.run((context) -> {
141157
PulsarAdministration pulsarAdmin = context.getBean(PulsarAdministration.class);
142158
Customizers<PulsarAdminBuilderCustomizer, PulsarAdminBuilder> customizers = Customizers
143159
.of(PulsarAdminBuilder.class, PulsarAdminBuilderCustomizer::customize);
144160
assertThat(customizers.fromField(pulsarAdmin, "adminCustomizers")).callsInOrder(
145-
PulsarAdminBuilder::serviceHttpUrl, "fromPropsCustomizer", "fromCustomizer1",
161+
PulsarAdminBuilder::serviceHttpUrl, "connectiondetails", "fromCustomizer1",
146162
"fromCustomizer2");
147163
});
148164
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.springframework.pulsar.listener.PulsarContainerProperties;
4242

4343
import static org.assertj.core.api.Assertions.assertThat;
44+
import static org.mockito.BDDMockito.given;
4445
import static org.mockito.BDDMockito.then;
4546
import static org.mockito.Mockito.mock;
4647

@@ -60,7 +61,8 @@ void customizeClientBuilderWhenHasNoAuthentication() {
6061
properties.getClient().setOperationTimeout(Duration.ofSeconds(2));
6162
properties.getClient().setLookupTimeout(Duration.ofSeconds(3));
6263
ClientBuilder builder = mock(ClientBuilder.class);
63-
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder);
64+
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
65+
new PropertiesPulsarConnectionDetails(properties));
6466
then(builder).should().serviceUrl("https://example.com");
6567
then(builder).should().connectionTimeout(1000, TimeUnit.MILLISECONDS);
6668
then(builder).should().operationTimeout(2000, TimeUnit.MILLISECONDS);
@@ -74,10 +76,22 @@ void customizeClientBuilderWhenHasAuthentication() throws UnsupportedAuthenticat
7476
properties.getClient().getAuthentication().setPluginClassName("myclass");
7577
properties.getClient().getAuthentication().setParam(params);
7678
ClientBuilder builder = mock(ClientBuilder.class);
77-
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder);
79+
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
80+
new PropertiesPulsarConnectionDetails(properties));
7881
then(builder).should().authentication("myclass", params);
7982
}
8083

84+
@Test
85+
void customizeClientBuilderWhenHasConnectionDetails() {
86+
PulsarProperties properties = new PulsarProperties();
87+
properties.getClient().setServiceUrl("https://ignored.example.com");
88+
ClientBuilder builder = mock(ClientBuilder.class);
89+
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
90+
given(connectionDetails.getBrokerUrl()).willReturn("https://used.example.com");
91+
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder, connectionDetails);
92+
then(builder).should().serviceUrl("https://used.example.com");
93+
}
94+
8195
@Test
8296
void customizeAdminBuilderWhenHasNoAuthentication() {
8397
PulsarProperties properties = new PulsarProperties();
@@ -86,7 +100,8 @@ void customizeAdminBuilderWhenHasNoAuthentication() {
86100
properties.getAdmin().setReadTimeout(Duration.ofSeconds(2));
87101
properties.getAdmin().setRequestTimeout(Duration.ofSeconds(3));
88102
PulsarAdminBuilder builder = mock(PulsarAdminBuilder.class);
89-
new PulsarPropertiesMapper(properties).customizeAdminBuilder(builder);
103+
new PulsarPropertiesMapper(properties).customizeAdminBuilder(builder,
104+
new PropertiesPulsarConnectionDetails(properties));
90105
then(builder).should().serviceHttpUrl("https://example.com");
91106
then(builder).should().connectionTimeout(1000, TimeUnit.MILLISECONDS);
92107
then(builder).should().readTimeout(2000, TimeUnit.MILLISECONDS);
@@ -100,10 +115,22 @@ void customizeAdminBuilderWhenHasAuthentication() throws UnsupportedAuthenticati
100115
properties.getAdmin().getAuthentication().setPluginClassName("myclass");
101116
properties.getAdmin().getAuthentication().setParam(params);
102117
PulsarAdminBuilder builder = mock(PulsarAdminBuilder.class);
103-
new PulsarPropertiesMapper(properties).customizeAdminBuilder(builder);
118+
new PulsarPropertiesMapper(properties).customizeAdminBuilder(builder,
119+
new PropertiesPulsarConnectionDetails(properties));
104120
then(builder).should().authentication("myclass", params);
105121
}
106122

123+
@Test
124+
void customizeAdminBuilderWhenHasConnectionDetails() {
125+
PulsarProperties properties = new PulsarProperties();
126+
properties.getAdmin().setServiceUrl("https://ignored.example.com");
127+
PulsarAdminBuilder builder = mock(PulsarAdminBuilder.class);
128+
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
129+
given(connectionDetails.getAdminUrl()).willReturn("https://used.example.com");
130+
new PulsarPropertiesMapper(properties).customizeAdminBuilder(builder, connectionDetails);
131+
then(builder).should().serviceHttpUrl("https://used.example.com");
132+
}
133+
107134
@Test
108135
@SuppressWarnings("unchecked")
109136
void customizeProducerBuilder() {

0 commit comments

Comments
 (0)