Skip to content

Add ConnectionDetails and ServiceConnection support to Pulsar #37197

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions spring-boot-project/spring-boot-autoconfigure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,5 @@ tasks.named("checkSpringConfigurationMetadata").configure {

test {
jvmArgs += "--add-opens=java.base/java.net=ALL-UNNAMED"
jvmArgs += "--add-opens=java.base/sun.net=ALL-UNNAMED"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.boot.autoconfigure.pulsar;

/**
* Adapts {@link PulsarProperties} to {@link PulsarConnectionDetails}.
*
* @author Chris Bono
*/
class PropertiesPulsarConnectionDetails implements PulsarConnectionDetails {

private final PulsarProperties pulsarProperties;

PropertiesPulsarConnectionDetails(PulsarProperties pulsarProperties) {
this.pulsarProperties = pulsarProperties;
}

@Override
public String getPulsarBrokerUrl() {
return this.pulsarProperties.getClient().getServiceUrl();
}

@Override
public String getPulsarAdminUrl() {
return this.pulsarProperties.getAdmin().getServiceUrl();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,31 @@ class PulsarConfiguration {
this.propertiesMapper = new PulsarPropertiesMapper(properties);
}

@Bean
@ConditionalOnMissingBean(PulsarConnectionDetails.class)
PropertiesPulsarConnectionDetails pulsarConnectionDetails() {
return new PropertiesPulsarConnectionDetails(this.properties);
}

@Bean
@ConditionalOnMissingBean(PulsarClientFactory.class)
DefaultPulsarClientFactory pulsarClientFactory(ObjectProvider<PulsarClientBuilderCustomizer> customizersProvider) {
DefaultPulsarClientFactory pulsarClientFactory(PulsarConnectionDetails connectionDetails,
ObjectProvider<PulsarClientBuilderCustomizer> customizersProvider) {
List<PulsarClientBuilderCustomizer> allCustomizers = new ArrayList<>();
allCustomizers.add(this.propertiesMapper::customizeClientBuilder);
allCustomizers.add((clientBuilder) -> this.applyConnectionDetails(connectionDetails, clientBuilder));
allCustomizers.addAll(customizersProvider.orderedStream().toList());
DefaultPulsarClientFactory clientFactory = new DefaultPulsarClientFactory(
(clientBuilder) -> applyClientBuilderCustomizers(allCustomizers, clientBuilder));
return clientFactory;
}

private void applyConnectionDetails(PulsarConnectionDetails connectionDetails, ClientBuilder clientBuilder) {
if (connectionDetails.getPulsarBrokerUrl() != null) {
clientBuilder.serviceUrl(connectionDetails.getPulsarBrokerUrl());
}
}

private void applyClientBuilderCustomizers(List<PulsarClientBuilderCustomizer> customizers,
ClientBuilder clientBuilder) {
customizers.forEach((customizer) -> customizer.customize(clientBuilder));
Expand All @@ -95,14 +109,21 @@ PulsarClient pulsarClient(PulsarClientFactory clientFactory) throws PulsarClient

@Bean
@ConditionalOnMissingBean
PulsarAdministration pulsarAdministration(
PulsarAdministration pulsarAdministration(PulsarConnectionDetails connectionDetails,
ObjectProvider<PulsarAdminBuilderCustomizer> pulsarAdminBuilderCustomizers) {
List<PulsarAdminBuilderCustomizer> allCustomizers = new ArrayList<>();
allCustomizers.add(this.propertiesMapper::customizeAdminBuilder);
allCustomizers.add((adminBuilder) -> this.applyConnectionDetails(connectionDetails, adminBuilder));
allCustomizers.addAll(pulsarAdminBuilderCustomizers.orderedStream().toList());
return new PulsarAdministration((adminBuilder) -> applyAdminBuilderCustomizers(allCustomizers, adminBuilder));
}

private void applyConnectionDetails(PulsarConnectionDetails connectionDetails, PulsarAdminBuilder adminBuilder) {
if (connectionDetails.getPulsarAdminUrl() != null) {
adminBuilder.serviceHttpUrl(connectionDetails.getPulsarAdminUrl());
}
}

private void applyAdminBuilderCustomizers(List<PulsarAdminBuilderCustomizer> customizers,
PulsarAdminBuilder adminBuilder) {
customizers.forEach((customizer) -> customizer.customize(adminBuilder));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.boot.autoconfigure.pulsar;

import org.springframework.boot.autoconfigure.service.connection.ConnectionDetails;

/**
* Details required to establish a connection to a Pulsar service.
*
* @author Chris Bono
* @since 3.2.0
*/
public interface PulsarConnectionDetails extends ConnectionDetails {

/**
* Returns the Pulsar service URL for the broker.
* @return the Pulsar service URL for the broker
*/
String getPulsarBrokerUrl();

/**
* Returns the Pulsar web URL for the admin endpoint.
* @return the Pulsar web URL for the admin endpoint
*/
String getPulsarAdminUrl();

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ void customizeClientBuilder(ClientBuilder clientBuilder) {
PulsarProperties.Client properties = this.properties.getClient();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::getServiceUrl).to(clientBuilder::serviceUrl);

map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout));
map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout));
map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.boot.autoconfigure.pulsar;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests for {@link PropertiesPulsarConnectionDetails}.
*
* @author Chris Bono
*/
class PropertiesPulsarConnectionDetailsTests {

@Test
void pulsarBrokerUrlIsObtainedFromPulsarProperties() {
var pulsarProps = new PulsarProperties();
pulsarProps.getClient().setServiceUrl("foo");
var connectionDetails = new PropertiesPulsarConnectionDetails(pulsarProps);
assertThat(connectionDetails.getPulsarBrokerUrl()).isEqualTo("foo");
}

@Test
void pulsarAdminUrlIsObtainedFromPulsarProperties() {
var pulsarProps = new PulsarProperties();
pulsarProps.getAdmin().setServiceUrl("foo");
var connectionDetails = new PropertiesPulsarConnectionDetails(pulsarProps);
assertThat(connectionDetails.getPulsarAdminUrl()).isEqualTo("foo");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ void whenCustomPulsarReaderAnnotationProcessorDefinedAutoConfigurationIsSkipped(
@Test
void autoConfiguresBeans() {
this.contextRunner.run((context) -> assertThat(context).hasSingleBean(PulsarConfiguration.class)
.hasSingleBean(PulsarConnectionDetails.class)
.hasSingleBean(DefaultPulsarClientFactory.class)
.hasSingleBean(PulsarClient.class)
.hasSingleBean(PulsarAdministration.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

/**
Expand All @@ -67,6 +68,15 @@ class PulsarConfigurationTests {
.withConfiguration(AutoConfigurations.of(PulsarConfiguration.class))
.withBean(PulsarClient.class, () -> mock(PulsarClient.class));

@Test
void whenHasUserDefinedConnectionDetailsBeanDoesNotAutoConfigureBean() {
PulsarConnectionDetails customConnectionDetails = mock(PulsarConnectionDetails.class);
this.contextRunner
.withBean("customPulsarConnectionDetails", PulsarConnectionDetails.class, () -> customConnectionDetails)
.run((context) -> assertThat(context).getBean(PulsarConnectionDetails.class)
.isSameAs(customConnectionDetails));
}

@Nested
class ClientTests {

Expand All @@ -86,17 +96,36 @@ void whenHasUserDefinedClientBeanDoesNotAutoConfigureBean() {
.run((context) -> assertThat(context).getBean(PulsarClient.class).isSameAs(customClient));
}

@Test
void whenConnectionDetailsAreNullTheyAreNotApplied() {
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
given(connectionDetails.getPulsarBrokerUrl()).willReturn(null);
PulsarConfigurationTests.this.contextRunner.withBean(PulsarConnectionDetails.class, () -> connectionDetails)
.withPropertyValues("spring.pulsar.client.service-url=fromPropsCustomizer")
.run((context) -> {
DefaultPulsarClientFactory clientFactory = context.getBean(DefaultPulsarClientFactory.class);
Customizers<PulsarClientBuilderCustomizer, ClientBuilder> customizers = Customizers
.of(ClientBuilder.class, PulsarClientBuilderCustomizer::customize);
assertThat(customizers.fromField(clientFactory, "customizer"))
.callsInOrder(ClientBuilder::serviceUrl, "fromPropsCustomizer");
});
}

@Test
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
given(connectionDetails.getPulsarBrokerUrl()).willReturn("fromConnectionDetailsCustomizer");
PulsarConfigurationTests.this.contextRunner
.withUserConfiguration(PulsarClientBuilderCustomizersConfig.class)
.withBean(PulsarConnectionDetails.class, () -> connectionDetails)
.withPropertyValues("spring.pulsar.client.service-url=fromPropsCustomizer")
.run((context) -> {
DefaultPulsarClientFactory clientFactory = context.getBean(DefaultPulsarClientFactory.class);
Customizers<PulsarClientBuilderCustomizer, ClientBuilder> customizers = Customizers
.of(ClientBuilder.class, PulsarClientBuilderCustomizer::customize);
assertThat(customizers.fromField(clientFactory, "customizer")).callsInOrder(
ClientBuilder::serviceUrl, "fromPropsCustomizer", "fromCustomizer1", "fromCustomizer2");
ClientBuilder::serviceUrl, "fromPropsCustomizer", "fromConnectionDetailsCustomizer",
"fromCustomizer1", "fromCustomizer2");
});
}

Expand Down Expand Up @@ -133,17 +162,35 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
.isSameAs(pulsarAdministration));
}

@Test
void whenConnectionDetailsAreNullTheyAreNotApplied() {
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
given(connectionDetails.getPulsarAdminUrl()).willReturn(null);
PulsarConfigurationTests.this.contextRunner.withBean(PulsarConnectionDetails.class, () -> connectionDetails)
.withPropertyValues("spring.pulsar.admin.service-url=fromPropsCustomizer")
.run((context) -> {
PulsarAdministration pulsarAdmin = context.getBean(PulsarAdministration.class);
Customizers<PulsarAdminBuilderCustomizer, PulsarAdminBuilder> customizers = Customizers
.of(PulsarAdminBuilder.class, PulsarAdminBuilderCustomizer::customize);
assertThat(customizers.fromField(pulsarAdmin, "adminCustomizers"))
.callsInOrder(PulsarAdminBuilder::serviceHttpUrl, "fromPropsCustomizer");
});
}

@Test
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
given(connectionDetails.getPulsarAdminUrl()).willReturn("fromConnectionDetailsCustomizer");
this.contextRunner.withUserConfiguration(PulsarAdminBuilderCustomizersConfig.class)
.withBean(PulsarConnectionDetails.class, () -> connectionDetails)
.withPropertyValues("spring.pulsar.admin.service-url=fromPropsCustomizer")
.run((context) -> {
PulsarAdministration pulsarAdmin = context.getBean(PulsarAdministration.class);
Customizers<PulsarAdminBuilderCustomizer, PulsarAdminBuilder> customizers = Customizers
.of(PulsarAdminBuilder.class, PulsarAdminBuilderCustomizer::customize);
assertThat(customizers.fromField(pulsarAdmin, "adminCustomizers")).callsInOrder(
PulsarAdminBuilder::serviceHttpUrl, "fromPropsCustomizer", "fromCustomizer1",
"fromCustomizer2");
PulsarAdminBuilder::serviceHttpUrl, "fromPropsCustomizer",
"fromConnectionDetailsCustomizer", "fromCustomizer1", "fromCustomizer2");
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.boot.docker.compose.service.connection.pulsar;

import org.springframework.boot.autoconfigure.pulsar.PulsarConnectionDetails;
import org.springframework.boot.docker.compose.core.RunningService;
import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionDetailsFactory;
import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionSource;

/**
* {@link DockerComposeConnectionDetailsFactory} to create {@link PulsarConnectionDetails}
* for a {@code pulsar} service.
*
* @author Chris Bono
*/
class PulsarDockerComposeConnectionDetailsFactory
extends DockerComposeConnectionDetailsFactory<PulsarConnectionDetails> {

private static final int PULSAR_BROKER_PORT = 6650;

private static final int PULSAR_ADMIN_PORT = 8080;

PulsarDockerComposeConnectionDetailsFactory() {
super("apachepulsar/pulsar");
}

@Override
protected PulsarConnectionDetails getDockerComposeConnectionDetails(DockerComposeConnectionSource source) {
return new PulsarDockerComposeConnectionDetails(source.getRunningService());
}

/**
* {@link PulsarConnectionDetails} backed by a {@code pulsar} {@link RunningService}.
*/
static class PulsarDockerComposeConnectionDetails extends DockerComposeConnectionDetails
implements PulsarConnectionDetails {

private final String brokerUrl;

private final String adminUrl;

PulsarDockerComposeConnectionDetails(RunningService service) {
super(service);
this.brokerUrl = "pulsar://%s:%s".formatted(service.host(), service.ports().get(PULSAR_BROKER_PORT));
this.adminUrl = "http://%s:%s".formatted(service.host(), service.ports().get(PULSAR_ADMIN_PORT));
}

@Override
public String getPulsarBrokerUrl() {
return this.brokerUrl;
}

@Override
public String getPulsarAdminUrl() {
return this.adminUrl;
}

}

}
Loading